Files
autocoder/server/routers/schedules.py
Auto 0736b5ec6b fix: address critical issues in PR #75 agent scheduling feature
This commit fixes several issues identified in the agent scheduling
feature from PR #75:

Frontend Fixes:
- Add day boundary handling in timeUtils.ts for timezone conversions
- Add utcToLocalWithDayShift/localToUTCWithDayShift functions
- Add shiftDaysForward/shiftDaysBackward helpers for bitfield adjustment
- Update ScheduleModal to correctly adjust days_of_week when crossing
  day boundaries during UTC conversion (fixes schedules running on
  wrong days for users in extreme timezones like UTC+9)

Backend Fixes:
- Add MAX_SCHEDULES_PER_PROJECT (50) limit to prevent resource exhaustion
- Wire up crash recovery callback in scheduler_service._start_agent()
- Convert schedules.py endpoints to use context manager for DB sessions
- Fix race condition in override creation with atomic delete-then-create
- Replace deprecated datetime.utcnow with datetime.now(timezone.utc)
- Add DB-level CHECK constraints for Schedule model fields

Files Modified:
- api/database.py: Add _utc_now helper, CheckConstraint imports, constraints
- progress.py: Replace deprecated datetime.utcnow
- server/routers/schedules.py: Add context manager, schedule limits
- server/services/assistant_database.py: Replace deprecated datetime.utcnow
- server/services/scheduler_service.py: Wire crash recovery, fix race condition
- ui/src/components/ScheduleModal.tsx: Use day shift functions
- ui/src/lib/timeUtils.ts: Add day boundary handling functions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 08:35:57 +02:00

409 lines
14 KiB
Python

"""
Schedules Router
================
API endpoints for managing agent schedules.
Provides CRUD operations for time-based schedule configuration.
"""
import re
import sys
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Generator, Tuple
from fastapi import APIRouter, HTTPException
from sqlalchemy.orm import Session
# Schedule limits to prevent resource exhaustion
MAX_SCHEDULES_PER_PROJECT = 50
from ..schemas import (
NextRunResponse,
ScheduleCreate,
ScheduleListResponse,
ScheduleResponse,
ScheduleUpdate,
)
def _get_project_path(project_name: str) -> Path:
"""Get project path from registry."""
root = Path(__file__).parent.parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from registry import get_project_path
return get_project_path(project_name)
router = APIRouter(
prefix="/api/projects/{project_name}/schedules",
tags=["schedules"]
)
def validate_project_name(name: str) -> str:
"""Validate and sanitize project name to prevent path traversal."""
if not re.match(r'^[a-zA-Z0-9_-]{1,50}$', name):
raise HTTPException(
status_code=400,
detail="Invalid project name"
)
return name
@contextmanager
def _get_db_session(project_name: str) -> Generator[Tuple[Session, Path], None, None]:
"""Get database session for a project as a context manager.
Usage:
with _get_db_session(project_name) as (db, project_path):
# ... use db ...
# db is automatically closed
"""
from api.database import create_database
project_name = validate_project_name(project_name)
project_path = _get_project_path(project_name)
if not project_path:
raise HTTPException(
status_code=404,
detail=f"Project '{project_name}' not found in registry"
)
if not project_path.exists():
raise HTTPException(
status_code=404,
detail=f"Project directory not found: {project_path}"
)
_, SessionLocal = create_database(project_path)
db = SessionLocal()
try:
yield db, project_path
finally:
db.close()
@router.get("", response_model=ScheduleListResponse)
async def list_schedules(project_name: str):
"""Get all schedules for a project."""
from api.database import Schedule
with _get_db_session(project_name) as (db, _):
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name
).order_by(Schedule.start_time).all()
return ScheduleListResponse(
schedules=[
ScheduleResponse(
id=s.id,
project_name=s.project_name,
start_time=s.start_time,
duration_minutes=s.duration_minutes,
days_of_week=s.days_of_week,
enabled=s.enabled,
yolo_mode=s.yolo_mode,
model=s.model,
crash_count=s.crash_count,
created_at=s.created_at,
)
for s in schedules
]
)
@router.post("", response_model=ScheduleResponse, status_code=201)
async def create_schedule(project_name: str, data: ScheduleCreate):
"""Create a new schedule for a project."""
from api.database import Schedule
from ..services.scheduler_service import get_scheduler
with _get_db_session(project_name) as (db, project_path):
# Check schedule limit to prevent resource exhaustion
existing_count = db.query(Schedule).filter(
Schedule.project_name == project_name
).count()
if existing_count >= MAX_SCHEDULES_PER_PROJECT:
raise HTTPException(
status_code=400,
detail=f"Maximum schedules per project ({MAX_SCHEDULES_PER_PROJECT}) exceeded"
)
# Create schedule record
schedule = Schedule(
project_name=project_name,
start_time=data.start_time,
duration_minutes=data.duration_minutes,
days_of_week=data.days_of_week,
enabled=data.enabled,
yolo_mode=data.yolo_mode,
model=data.model,
)
db.add(schedule)
db.commit()
db.refresh(schedule)
# Register with APScheduler if enabled
if schedule.enabled:
import logging
logger = logging.getLogger(__name__)
scheduler = get_scheduler()
await scheduler.add_schedule(project_name, schedule, project_path)
logger.info(f"Registered schedule {schedule.id} with APScheduler")
# Check if we're currently within this schedule's window
# If so, start the agent immediately (cron won't trigger until next occurrence)
now = datetime.now(timezone.utc)
is_within = scheduler._is_within_window(schedule, now)
logger.info(f"Schedule {schedule.id}: is_within_window={is_within}, now={now}, start={schedule.start_time}")
if is_within:
# Check for manual stop override
from api.database import ScheduleOverride
override = db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule.id,
ScheduleOverride.override_type == "stop",
ScheduleOverride.expires_at > now,
).first()
logger.info(f"Schedule {schedule.id}: has_override={override is not None}")
if not override:
# Start agent immediately
logger.info(
f"Schedule {schedule.id} is within active window, starting agent immediately"
)
try:
await scheduler._start_agent(project_name, project_path, schedule)
logger.info(f"Successfully started agent for schedule {schedule.id}")
except Exception as e:
logger.error(f"Failed to start agent for schedule {schedule.id}: {e}", exc_info=True)
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
@router.get("/next", response_model=NextRunResponse)
async def get_next_scheduled_run(project_name: str):
"""Calculate next scheduled run across all enabled schedules."""
from api.database import Schedule, ScheduleOverride
from ..services.scheduler_service import get_scheduler
with _get_db_session(project_name) as (db, _):
schedules = db.query(Schedule).filter(
Schedule.project_name == project_name,
Schedule.enabled == True, # noqa: E712
).all()
if not schedules:
return NextRunResponse(
has_schedules=False,
next_start=None,
next_end=None,
is_currently_running=False,
active_schedule_count=0,
)
now = datetime.now(timezone.utc)
scheduler = get_scheduler()
# Find active schedules and calculate next run
active_count = 0
next_start = None
latest_end = None
for schedule in schedules:
if scheduler._is_within_window(schedule, now):
# Check for manual stop override
override = db.query(ScheduleOverride).filter(
ScheduleOverride.schedule_id == schedule.id,
ScheduleOverride.override_type == "stop",
ScheduleOverride.expires_at > now,
).first()
if not override:
# Schedule is active and not manually stopped
active_count += 1
# Calculate end time for this window
end_time = _calculate_window_end(schedule, now)
if latest_end is None or end_time > latest_end:
latest_end = end_time
# If override exists, treat schedule as not active
else:
# Calculate next start time
next_schedule_start = _calculate_next_start(schedule, now)
if next_schedule_start and (next_start is None or next_schedule_start < next_start):
next_start = next_schedule_start
return NextRunResponse(
has_schedules=True,
next_start=next_start if active_count == 0 else None,
next_end=latest_end,
is_currently_running=active_count > 0,
active_schedule_count=active_count,
)
@router.get("/{schedule_id}", response_model=ScheduleResponse)
async def get_schedule(project_name: str, schedule_id: int):
"""Get a single schedule by ID."""
from api.database import Schedule
with _get_db_session(project_name) as (db, _):
schedule = db.query(Schedule).filter(
Schedule.id == schedule_id,
Schedule.project_name == project_name,
).first()
if not schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
@router.patch("/{schedule_id}", response_model=ScheduleResponse)
async def update_schedule(
project_name: str,
schedule_id: int,
data: ScheduleUpdate
):
"""Update an existing schedule."""
from api.database import Schedule
from ..services.scheduler_service import get_scheduler
with _get_db_session(project_name) as (db, project_path):
schedule = db.query(Schedule).filter(
Schedule.id == schedule_id,
Schedule.project_name == project_name,
).first()
if not schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
was_enabled = schedule.enabled
# Update only fields that were explicitly provided
# This allows sending {"model": null} to clear it vs omitting the field entirely
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(schedule, field, value)
db.commit()
db.refresh(schedule)
# Update APScheduler jobs
scheduler = get_scheduler()
if schedule.enabled:
# Re-register with updated times
await scheduler.add_schedule(project_name, schedule, project_path)
elif was_enabled:
# Was enabled, now disabled - remove jobs
scheduler.remove_schedule(schedule_id)
return ScheduleResponse(
id=schedule.id,
project_name=schedule.project_name,
start_time=schedule.start_time,
duration_minutes=schedule.duration_minutes,
days_of_week=schedule.days_of_week,
enabled=schedule.enabled,
yolo_mode=schedule.yolo_mode,
model=schedule.model,
crash_count=schedule.crash_count,
created_at=schedule.created_at,
)
@router.delete("/{schedule_id}", status_code=204)
async def delete_schedule(project_name: str, schedule_id: int):
"""Delete a schedule."""
from api.database import Schedule
from ..services.scheduler_service import get_scheduler
with _get_db_session(project_name) as (db, _):
schedule = db.query(Schedule).filter(
Schedule.id == schedule_id,
Schedule.project_name == project_name,
).first()
if not schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
# Remove APScheduler jobs
scheduler = get_scheduler()
scheduler.remove_schedule(schedule_id)
# Delete from database
db.delete(schedule)
db.commit()
def _calculate_window_end(schedule, now: datetime) -> datetime:
"""Calculate when the current window ends."""
start_hour, start_minute = map(int, schedule.start_time.split(":"))
# Create start time for today in UTC
window_start = now.replace(
hour=start_hour, minute=start_minute, second=0, microsecond=0
)
# If current time is before start time, the window started yesterday
if now < window_start:
window_start = window_start - timedelta(days=1)
return window_start + timedelta(minutes=schedule.duration_minutes)
def _calculate_next_start(schedule, now: datetime) -> datetime | None:
"""Calculate the next start time for a schedule."""
start_hour, start_minute = map(int, schedule.start_time.split(":"))
# Create start time for today
candidate = now.replace(
hour=start_hour, minute=start_minute, second=0, microsecond=0
)
# If already past today's start time, check tomorrow
if candidate <= now:
candidate = candidate + timedelta(days=1)
# Find the next active day
for _ in range(7):
if schedule.is_active_on_day(candidate.weekday()):
return candidate
candidate = candidate + timedelta(days=1)
return None