11 Commits

Author SHA1 Message Date
Auto
032752e564 0.1.2 2026-02-05 08:53:00 +02:00
Auto
c55a1a0182 fix: harden dev server RCE mitigations from PR #153
Address security gaps and improve validation in the dev server command
execution path introduced by PR #153:

Security fixes (critical):
- Add missing shell metacharacters to dangerous_ops blocklist: single &
  (Windows cmd.exe command separator), >, <, ^, %, \n, \r
- The single & gap was a confirmed RCE bypass on Windows where .cmd
  files are always executed via cmd.exe even with shell=False (CPython
  limitation documented in issue #77696)
- Apply validate_custom_command_strict at /start endpoint for
  defense-in-depth against config file tampering

Validation improvements:
- Fix uvicorn --flag=value syntax (split on = before comparing)
- Expand Python support: Django (manage.py), Flask, custom .py scripts
- Add runners: flask, poetry, cargo, go, npx
- Expand npm script allowlist: serve, develop, server, preview
- Reorder PATCH /config validation to run strict check first (fail fast)
- Extract constants: ALLOWED_NPM_SCRIPTS, ALLOWED_PYTHON_MODULES,
  BLOCKED_SHELLS for reuse and testability

Cleanup:
- Remove unused security.py imports from dev_server_manager.py
- Fix deprecated datetime.utcnow() -> datetime.now(timezone.utc)
- Remove unnecessary _remove_lock() in exception handlers where lock
  was never created (Popen failure path)

Tests:
- Add test_devserver_security.py with 78 tests covering valid commands,
  blocked shells, blocked commands, injection attempts, dangerous_ops
  blocking, and constant verification

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 08:52:47 +02:00
Leon van Zyl
75766a433a Merge pull request #153 from syphonetic/master
Implemented RCE mitigation measures
2026-02-05 08:31:28 +02:00
Leon van Zyl
ee993ed8ed Merge pull request #158 from Mediainvita/fix/temp-cleanup
fix: add automatic temp folder cleanup at Maestro startup
2026-02-05 08:20:23 +02:00
Manuel Fischer
a3b0abdc31 fix: add automatic temp folder cleanup at Maestro startup
Problem:
When AutoForge runs agents that use Playwright for browser testing or
mongodb-memory-server for database tests, temporary files accumulate in
the system temp folder (%TEMP% on Windows, /tmp on Linux/macOS). These
files are never cleaned up automatically and can consume hundreds of GB
over time.

Affected temp items:
- playwright_firefoxdev_profile-* (browser profiles)
- playwright-artifacts-* (test artifacts)
- playwright-transform-cache
- mongodb-memory-server* (MongoDB binaries)
- ng-* (Angular CLI temp)
- scoped_dir* (Chrome/Chromium temp)
- .78912*.node (Node.js native module cache, ~7MB each)
- claude-*-cwd (Claude CLI working directory files)
- mat-debug-*.log (Material/Angular debug logs)

Solution:
- New temp_cleanup.py module with cleanup_stale_temp() function
- Called at Maestro (orchestrator) startup in autonomous_agent_demo.py
- Only deletes files/folders older than 1 hour (safe for running processes)
- Runs every time the Play button is clicked or agent auto-restarts
- Reports cleanup stats: dirs deleted, files deleted, MB freed

Why cleanup at Maestro startup:
- Reliable hook point (runs on every agent start, including auto-restart
  after rate limits which happens every ~5 hours)
- No need for background timers or scheduled tasks
- Cleanup happens before new temp files are created

Testing:
- Tested on Windows with 958 items in temp folder
- Successfully cleaned 45 dirs, 758 files, freed 415 MB
- Files younger than 1 hour correctly preserved

Closes #155

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-05 00:08:26 +01:00
Auto
326f38b3c4 version patch 2026-02-04 15:41:15 +02:00
syphonetic
81d2f0cbe0 Merge branch 'master' into master 2026-02-04 05:50:35 +08:00
syphonetic
c7c88449ad Remove unused dev server management functions
Removed unused functions and endpoints related to dev server management, including command validation and configuration updates.
2026-02-04 02:34:29 +08:00
syphonetic
9622da9561 Remove unnecessary blank line in dev_server_manager.py 2026-02-04 02:34:06 +08:00
syphonetic
83d2182107 Refactor dev server API for security and validation
Refactor dev server API to enhance security and command validation. Added logging and improved command handling.
2026-02-04 02:19:19 +08:00
syphonetic
7651436c27 Refactor dev server command execution and locking
Refactor dev server management to improve command execution and security checks. Introduce lock file handling and command validation enhancements.
2026-02-04 02:18:55 +08:00
8 changed files with 668 additions and 47 deletions

View File

@@ -263,6 +263,17 @@ def main() -> None:
) )
else: else:
# Entry point mode - always use unified orchestrator # Entry point mode - always use unified orchestrator
# Clean up stale temp files before starting (prevents temp folder bloat)
from temp_cleanup import cleanup_stale_temp
cleanup_stats = cleanup_stale_temp()
if cleanup_stats["dirs_deleted"] > 0 or cleanup_stats["files_deleted"] > 0:
mb_freed = cleanup_stats["bytes_freed"] / (1024 * 1024)
print(
f"[CLEANUP] Removed {cleanup_stats['dirs_deleted']} dirs, "
f"{cleanup_stats['files_deleted']} files ({mb_freed:.1f} MB freed)",
flush=True,
)
from parallel_orchestrator import run_parallel_orchestrator from parallel_orchestrator import run_parallel_orchestrator
# Clamp concurrency to valid range (1-5) # Clamp concurrency to valid range (1-5)

View File

@@ -1,6 +1,6 @@
{ {
"name": "autoforge-ai", "name": "autoforge-ai",
"version": "0.1.1", "version": "0.1.2",
"description": "Autonomous coding agent with web UI - build complete apps with AI", "description": "Autonomous coding agent with web UI - build complete apps with AI",
"license": "AGPL-3.0", "license": "AGPL-3.0",
"bin": { "bin": {

View File

@@ -7,6 +7,7 @@ Uses project registry for path lookups and project_config for command detection.
""" """
import logging import logging
import shlex
import sys import sys
from pathlib import Path from pathlib import Path
@@ -72,6 +73,116 @@ def get_project_dir(project_name: str) -> Path:
return project_dir return project_dir
ALLOWED_RUNNERS = {
"npm", "pnpm", "yarn", "npx",
"uvicorn", "python", "python3",
"flask", "poetry",
"cargo", "go",
}
ALLOWED_NPM_SCRIPTS = {"dev", "start", "serve", "develop", "server", "preview"}
# Allowed Python -m modules for dev servers
ALLOWED_PYTHON_MODULES = {"uvicorn", "flask", "gunicorn", "http.server"}
BLOCKED_SHELLS = {"sh", "bash", "zsh", "cmd", "powershell", "pwsh", "cmd.exe"}
def validate_custom_command_strict(cmd: str) -> None:
"""
Strict allowlist validation for dev server commands.
Prevents arbitrary command execution (no sh -c, no cmd /c, no python -c, etc.)
"""
if not isinstance(cmd, str) or not cmd.strip():
raise ValueError("custom_command cannot be empty")
argv = shlex.split(cmd, posix=(sys.platform != "win32"))
if not argv:
raise ValueError("custom_command could not be parsed")
base = Path(argv[0]).name.lower()
# Block direct shells / interpreters commonly used for command injection
if base in BLOCKED_SHELLS:
raise ValueError(f"custom_command runner not allowed: {base}")
if base not in ALLOWED_RUNNERS:
raise ValueError(
f"custom_command runner not allowed: {base}. "
f"Allowed: {', '.join(sorted(ALLOWED_RUNNERS))}"
)
# Block one-liner execution for python
lowered = [a.lower() for a in argv]
if base in {"python", "python3"}:
if "-c" in lowered:
raise ValueError("python -c is not allowed")
if len(argv) >= 3 and argv[1] == "-m":
# Allow: python -m <allowed_module> ...
if argv[2] not in ALLOWED_PYTHON_MODULES:
raise ValueError(
f"python -m {argv[2]} is not allowed. "
f"Allowed modules: {', '.join(sorted(ALLOWED_PYTHON_MODULES))}"
)
elif len(argv) >= 2 and argv[1].endswith(".py"):
# Allow: python manage.py runserver, python app.py, etc.
pass
else:
raise ValueError(
"Python commands must use 'python -m <module> ...' or 'python <script>.py ...'"
)
if base == "flask":
# Allow: flask run [--host ...] [--port ...]
if len(argv) < 2 or argv[1] != "run":
raise ValueError("flask custom_command must be 'flask run [options]'")
if base == "poetry":
# Allow: poetry run <subcmd> ...
if len(argv) < 3 or argv[1] != "run":
raise ValueError("poetry custom_command must be 'poetry run <command> ...'")
if base == "uvicorn":
if len(argv) < 2 or ":" not in argv[1]:
raise ValueError("uvicorn must specify an app like module:app")
allowed_flags = {"--host", "--port", "--reload", "--log-level", "--workers"}
for a in argv[2:]:
if a.startswith("-"):
# Handle --flag=value syntax
flag_key = a.split("=", 1)[0]
if flag_key not in allowed_flags:
raise ValueError(f"uvicorn flag not allowed: {flag_key}")
if base in {"npm", "pnpm", "yarn"}:
# Allow only known safe scripts (no arbitrary exec)
if base == "npm":
if len(argv) < 3 or argv[1] != "run" or argv[2] not in ALLOWED_NPM_SCRIPTS:
raise ValueError(
f"npm custom_command must be 'npm run <script>' where script is one of: "
f"{', '.join(sorted(ALLOWED_NPM_SCRIPTS))}"
)
elif base == "pnpm":
ok = (
(len(argv) >= 2 and argv[1] in ALLOWED_NPM_SCRIPTS)
or (len(argv) >= 3 and argv[1] == "run" and argv[2] in ALLOWED_NPM_SCRIPTS)
)
if not ok:
raise ValueError(
f"pnpm custom_command must use a known script: "
f"{', '.join(sorted(ALLOWED_NPM_SCRIPTS))}"
)
elif base == "yarn":
ok = (
(len(argv) >= 2 and argv[1] in ALLOWED_NPM_SCRIPTS)
or (len(argv) >= 3 and argv[1] == "run" and argv[2] in ALLOWED_NPM_SCRIPTS)
)
if not ok:
raise ValueError(
f"yarn custom_command must use a known script: "
f"{', '.join(sorted(ALLOWED_NPM_SCRIPTS))}"
)
def get_project_devserver_manager(project_name: str): def get_project_devserver_manager(project_name: str):
""" """
@@ -180,8 +291,11 @@ async def start_devserver(
# Determine which command to use # Determine which command to use
command: str | None command: str | None
if request.command: if request.command:
command = request.command raise HTTPException(
else: status_code=400,
detail="Direct command execution is disabled. Use /config to set a safe custom_command."
)
command = get_dev_command(project_dir) command = get_dev_command(project_dir)
if not command: if not command:
@@ -193,6 +307,13 @@ async def start_devserver(
# Validate command against security allowlist before execution # Validate command against security allowlist before execution
validate_dev_command(command, project_dir) validate_dev_command(command, project_dir)
# Defense-in-depth: also run strict structural validation at execution time
# (catches config file tampering that bypasses the /config endpoint)
try:
validate_custom_command_strict(command)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Now command is definitely str and validated # Now command is definitely str and validated
success, message = await manager.start(command) success, message = await manager.start(command)
@@ -284,7 +405,13 @@ async def update_devserver_config(
except ValueError as e: except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
else: else:
# Validate command against security allowlist before persisting # Strict structural validation first (most specific errors)
try:
validate_custom_command_strict(update.custom_command)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Then validate against security allowlist
validate_dev_command(update.custom_command, project_dir) validate_dev_command(update.custom_command, project_dir)
# Set the custom command # Set the custom command

View File

@@ -14,17 +14,17 @@ This is a simplified version of AgentProcessManager, tailored for dev servers:
import asyncio import asyncio
import logging import logging
import re import re
import shlex
import subprocess import subprocess
import sys import sys
import threading import threading
from datetime import datetime from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Awaitable, Callable, Literal, Set from typing import Awaitable, Callable, Literal, Set
import psutil import psutil
from registry import list_registered_projects from registry import list_registered_projects
from security import extract_commands, get_effective_commands, is_command_allowed
from server.utils.process_utils import kill_process_tree from server.utils.process_utils import kill_process_tree
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -291,53 +291,54 @@ class DevServerProcessManager:
Start the dev server as a subprocess. Start the dev server as a subprocess.
Args: Args:
command: The shell command to run (e.g., "npm run dev") command: The command to run (e.g., "npm run dev")
Returns: Returns:
Tuple of (success, message) Tuple of (success, message)
""" """
if self.status == "running": # Already running?
if self.process and self.status == "running":
return False, "Dev server is already running" return False, "Dev server is already running"
# Lock check (prevents double-start)
if not self._check_lock(): if not self._check_lock():
return False, "Another dev server instance is already running for this project" return False, "Dev server already running (lock file present)"
# Validate that project directory exists command = (command or "").strip()
if not self.project_dir.exists(): if not command:
return False, f"Project directory does not exist: {self.project_dir}" return False, "Empty dev server command"
# Defense-in-depth: validate command against security allowlist # SECURITY: block shell operators/metacharacters (defense-in-depth)
commands = extract_commands(command) # NOTE: On Windows, .cmd/.bat files are executed via cmd.exe even with
if not commands: # shell=False (CPython limitation), so metacharacter blocking is critical.
return False, "Could not parse command for security validation" # Single & is a cmd.exe command separator, ^ is cmd escape, % enables
# environment variable expansion, > < enable redirection.
dangerous_ops = ["&&", "||", ";", "|", "`", "$(", "&", ">", "<", "^", "%"]
if any(op in command for op in dangerous_ops):
return False, "Shell operators are not allowed in dev server command"
# Block newline injection (cmd.exe interprets newlines as command separators)
if "\n" in command or "\r" in command:
return False, "Newlines are not allowed in dev server command"
allowed_commands, blocked_commands = get_effective_commands(self.project_dir) # Parse into argv and execute without shell
for cmd in commands: argv = shlex.split(command, posix=(sys.platform != "win32"))
if cmd in blocked_commands: if not argv:
logger.warning("Blocked dev server command '%s' (in blocklist) for %s", cmd, self.project_name) return False, "Empty dev server command"
return False, f"Command '{cmd}' is blocked and cannot be used as a dev server command"
if not is_command_allowed(cmd, allowed_commands):
logger.warning("Rejected dev server command '%s' (not in allowlist) for %s", cmd, self.project_name)
return False, f"Command '{cmd}' is not in the allowed commands list"
self._command = command base = Path(argv[0]).name.lower()
self._detected_url = None # Reset URL detection
# Defense-in-depth: reject direct shells/interpreters commonly used for injection
if base in {"sh", "bash", "zsh", "cmd", "powershell", "pwsh"}:
return False, f"Shell runner '{base}' is not allowed for dev server commands"
# Windows: use .cmd shims for Node package managers
if sys.platform == "win32" and base in {"npm", "pnpm", "yarn", "npx"} and not argv[0].lower().endswith(".cmd"):
argv[0] = argv[0] + ".cmd"
try: try:
# Determine shell based on platform
if sys.platform == "win32":
# On Windows, use cmd.exe
shell_cmd = ["cmd", "/c", command]
else:
# On Unix-like systems, use sh
shell_cmd = ["sh", "-c", command]
# Start subprocess with piped stdout/stderr
# stdin=DEVNULL prevents interactive dev servers from blocking on stdin
# On Windows, use CREATE_NO_WINDOW to prevent console window from flashing
if sys.platform == "win32": if sys.platform == "win32":
self.process = subprocess.Popen( self.process = subprocess.Popen(
shell_cmd, argv,
stdin=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
@@ -346,23 +347,33 @@ class DevServerProcessManager:
) )
else: else:
self.process = subprocess.Popen( self.process = subprocess.Popen(
shell_cmd, argv,
stdin=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
cwd=str(self.project_dir), cwd=str(self.project_dir),
) )
self._create_lock() self._command = command
self.started_at = datetime.now() self.started_at = datetime.now(timezone.utc)
self.status = "running" self._detected_url = None
# Start output streaming task # Create lock once we have a PID
self._create_lock()
# Start output streaming
self.status = "running"
self._output_task = asyncio.create_task(self._stream_output()) self._output_task = asyncio.create_task(self._stream_output())
return True, f"Dev server started with PID {self.process.pid}" return True, "Dev server started"
except FileNotFoundError:
self.status = "stopped"
self.process = None
return False, f"Command not found: {argv[0]}"
except Exception as e: except Exception as e:
logger.exception("Failed to start dev server") self.status = "stopped"
self.process = None
return False, f"Failed to start dev server: {e}" return False, f"Failed to start dev server: {e}"
async def stop(self) -> tuple[bool, str]: async def stop(self) -> tuple[bool, str]:

148
temp_cleanup.py Normal file
View File

@@ -0,0 +1,148 @@
"""
Temp Cleanup Module
===================
Cleans up stale temporary files and directories created by AutoForge agents,
Playwright, Node.js, and other development tools.
Called at Maestro (orchestrator) startup to prevent temp folder bloat.
Why this exists:
- Playwright creates browser profiles and artifacts in %TEMP%
- Node.js creates .node cache files (~7MB each, can accumulate to GBs)
- MongoDB Memory Server downloads binaries to temp
- These are never cleaned up automatically
When cleanup runs:
- At Maestro startup (when you click Play or auto-restart after rate limits)
- Only files/folders older than 1 hour are deleted (safe for running processes)
"""
import logging
import shutil
import tempfile
import time
from pathlib import Path
logger = logging.getLogger(__name__)
# Max age in seconds before a temp item is considered stale (1 hour)
MAX_AGE_SECONDS = 3600
# Directory patterns to clean up (glob patterns)
DIR_PATTERNS = [
"playwright_firefoxdev_profile-*", # Playwright Firefox profiles
"playwright-artifacts-*", # Playwright test artifacts
"playwright-transform-cache", # Playwright transform cache
"mongodb-memory-server*", # MongoDB Memory Server binaries
"ng-*", # Angular CLI temp directories
"scoped_dir*", # Chrome/Chromium temp directories
]
# File patterns to clean up (glob patterns)
FILE_PATTERNS = [
".78912*.node", # Node.js native module cache (major space consumer, ~7MB each)
"claude-*-cwd", # Claude CLI working directory temp files
"mat-debug-*.log", # Material/Angular debug logs
]
def cleanup_stale_temp(max_age_seconds: int = MAX_AGE_SECONDS) -> dict:
"""
Clean up stale temporary files and directories.
Only deletes items older than max_age_seconds to avoid
interfering with currently running processes.
Args:
max_age_seconds: Maximum age in seconds before an item is deleted.
Defaults to 1 hour (3600 seconds).
Returns:
Dictionary with cleanup statistics:
- dirs_deleted: Number of directories deleted
- files_deleted: Number of files deleted
- bytes_freed: Approximate bytes freed
- errors: List of error messages (for debugging, not fatal)
"""
temp_dir = Path(tempfile.gettempdir())
cutoff_time = time.time() - max_age_seconds
stats = {
"dirs_deleted": 0,
"files_deleted": 0,
"bytes_freed": 0,
"errors": [],
}
# Clean up directories
for pattern in DIR_PATTERNS:
for item in temp_dir.glob(pattern):
if not item.is_dir():
continue
try:
mtime = item.stat().st_mtime
if mtime < cutoff_time:
size = _get_dir_size(item)
shutil.rmtree(item, ignore_errors=True)
if not item.exists():
stats["dirs_deleted"] += 1
stats["bytes_freed"] += size
logger.debug(f"Deleted temp directory: {item}")
except Exception as e:
stats["errors"].append(f"Failed to delete {item}: {e}")
logger.debug(f"Failed to delete {item}: {e}")
# Clean up files
for pattern in FILE_PATTERNS:
for item in temp_dir.glob(pattern):
if not item.is_file():
continue
try:
mtime = item.stat().st_mtime
if mtime < cutoff_time:
size = item.stat().st_size
item.unlink(missing_ok=True)
if not item.exists():
stats["files_deleted"] += 1
stats["bytes_freed"] += size
logger.debug(f"Deleted temp file: {item}")
except Exception as e:
stats["errors"].append(f"Failed to delete {item}: {e}")
logger.debug(f"Failed to delete {item}: {e}")
# Log summary if anything was cleaned
if stats["dirs_deleted"] > 0 or stats["files_deleted"] > 0:
mb_freed = stats["bytes_freed"] / (1024 * 1024)
logger.info(
f"Temp cleanup: {stats['dirs_deleted']} dirs, "
f"{stats['files_deleted']} files, {mb_freed:.1f} MB freed"
)
return stats
def _get_dir_size(path: Path) -> int:
"""Get total size of a directory in bytes."""
total = 0
try:
for item in path.rglob("*"):
if item.is_file():
try:
total += item.stat().st_size
except (OSError, PermissionError):
pass
except (OSError, PermissionError):
pass
return total
if __name__ == "__main__":
# Allow running directly for testing/manual cleanup
logging.basicConfig(level=logging.DEBUG)
print("Running temp cleanup...")
stats = cleanup_stale_temp()
mb_freed = stats["bytes_freed"] / (1024 * 1024)
print(f"Cleanup complete: {stats['dirs_deleted']} dirs, {stats['files_deleted']} files, {mb_freed:.1f} MB freed")
if stats["errors"]:
print(f"Errors (non-fatal): {len(stats['errors'])}")

319
test_devserver_security.py Normal file
View File

@@ -0,0 +1,319 @@
#!/usr/bin/env python3
"""
Dev Server Security Tests
=========================
Tests for dev server command validation and security hardening.
Run with: python -m pytest test_devserver_security.py -v
"""
import sys
from pathlib import Path
import pytest
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from server.routers.devserver import (
ALLOWED_NPM_SCRIPTS,
ALLOWED_PYTHON_MODULES,
ALLOWED_RUNNERS,
BLOCKED_SHELLS,
validate_custom_command_strict,
)
# =============================================================================
# validate_custom_command_strict - Valid commands
# =============================================================================
class TestValidCommands:
"""Commands that should pass validation."""
def test_npm_run_dev(self):
validate_custom_command_strict("npm run dev")
def test_npm_run_start(self):
validate_custom_command_strict("npm run start")
def test_npm_run_serve(self):
validate_custom_command_strict("npm run serve")
def test_npm_run_preview(self):
validate_custom_command_strict("npm run preview")
def test_pnpm_dev(self):
validate_custom_command_strict("pnpm dev")
def test_pnpm_run_dev(self):
validate_custom_command_strict("pnpm run dev")
def test_yarn_start(self):
validate_custom_command_strict("yarn start")
def test_yarn_run_serve(self):
validate_custom_command_strict("yarn run serve")
def test_uvicorn_basic(self):
validate_custom_command_strict("uvicorn main:app")
def test_uvicorn_with_flags(self):
validate_custom_command_strict("uvicorn main:app --host 0.0.0.0 --port 8000 --reload")
def test_uvicorn_flag_equals_syntax(self):
validate_custom_command_strict("uvicorn main:app --port=8000 --host=0.0.0.0")
def test_python_m_uvicorn(self):
validate_custom_command_strict("python -m uvicorn main:app --reload")
def test_python3_m_uvicorn(self):
validate_custom_command_strict("python3 -m uvicorn main:app")
def test_python_m_flask(self):
validate_custom_command_strict("python -m flask run")
def test_python_m_gunicorn(self):
validate_custom_command_strict("python -m gunicorn main:app")
def test_python_m_http_server(self):
validate_custom_command_strict("python -m http.server 8000")
def test_python_script(self):
validate_custom_command_strict("python app.py")
def test_python_manage_py_runserver(self):
validate_custom_command_strict("python manage.py runserver")
def test_python_manage_py_runserver_with_port(self):
validate_custom_command_strict("python manage.py runserver 0.0.0.0:8000")
def test_flask_run(self):
validate_custom_command_strict("flask run")
def test_flask_run_with_options(self):
validate_custom_command_strict("flask run --host 0.0.0.0 --port 5000")
def test_poetry_run_command(self):
validate_custom_command_strict("poetry run python app.py")
def test_cargo_run(self):
# cargo is allowed but has no special sub-validation
validate_custom_command_strict("cargo run")
def test_go_run(self):
# go is allowed but has no special sub-validation
validate_custom_command_strict("go run .")
# =============================================================================
# validate_custom_command_strict - Blocked shells
# =============================================================================
class TestBlockedShells:
"""Shell interpreters that must be rejected."""
@pytest.mark.parametrize("shell", ["sh", "bash", "zsh", "cmd", "powershell", "pwsh", "cmd.exe"])
def test_blocked_shell(self, shell):
with pytest.raises(ValueError, match="runner not allowed"):
validate_custom_command_strict(f"{shell} -c 'echo hacked'")
# =============================================================================
# validate_custom_command_strict - Blocked commands
# =============================================================================
class TestBlockedCommands:
"""Commands that should be rejected."""
def test_empty_command(self):
with pytest.raises(ValueError, match="cannot be empty"):
validate_custom_command_strict("")
def test_whitespace_only(self):
with pytest.raises(ValueError, match="cannot be empty"):
validate_custom_command_strict(" ")
def test_python_dash_c(self):
with pytest.raises(ValueError, match="python -c is not allowed"):
validate_custom_command_strict("python -c 'import os; os.system(\"rm -rf /\")'")
def test_python3_dash_c(self):
with pytest.raises(ValueError, match="python -c is not allowed"):
validate_custom_command_strict("python3 -c 'print(1)'")
def test_python_no_script_or_module(self):
with pytest.raises(ValueError, match="must use"):
validate_custom_command_strict("python --version")
def test_python_m_disallowed_module(self):
with pytest.raises(ValueError, match="not allowed"):
validate_custom_command_strict("python -m pip install something")
def test_unknown_runner(self):
with pytest.raises(ValueError, match="runner not allowed"):
validate_custom_command_strict("curl http://evil.com")
def test_rm_rf(self):
with pytest.raises(ValueError, match="runner not allowed"):
validate_custom_command_strict("rm -rf /")
def test_npm_arbitrary_script(self):
with pytest.raises(ValueError, match="npm custom_command"):
validate_custom_command_strict("npm run postinstall")
def test_npm_exec(self):
with pytest.raises(ValueError, match="npm custom_command"):
validate_custom_command_strict("npm exec evil-package")
def test_pnpm_arbitrary_script(self):
with pytest.raises(ValueError, match="pnpm custom_command"):
validate_custom_command_strict("pnpm run postinstall")
def test_yarn_arbitrary_script(self):
with pytest.raises(ValueError, match="yarn custom_command"):
validate_custom_command_strict("yarn run postinstall")
def test_uvicorn_no_app(self):
with pytest.raises(ValueError, match="must specify an app"):
validate_custom_command_strict("uvicorn --reload")
def test_uvicorn_disallowed_flag(self):
with pytest.raises(ValueError, match="flag not allowed"):
validate_custom_command_strict("uvicorn main:app --factory")
def test_flask_no_run(self):
with pytest.raises(ValueError, match="flask custom_command"):
validate_custom_command_strict("flask shell")
def test_poetry_no_run(self):
with pytest.raises(ValueError, match="poetry custom_command"):
validate_custom_command_strict("poetry install")
# =============================================================================
# validate_custom_command_strict - Injection attempts
# =============================================================================
class TestInjectionAttempts:
"""Adversarial inputs that attempt to bypass validation."""
def test_shell_via_path_traversal(self):
with pytest.raises(ValueError, match="runner not allowed"):
validate_custom_command_strict("/bin/sh -c 'echo hacked'")
def test_shell_via_relative_path(self):
with pytest.raises(ValueError, match="runner not allowed"):
validate_custom_command_strict("../../bin/bash -c whoami")
def test_none_input(self):
with pytest.raises(ValueError, match="cannot be empty"):
validate_custom_command_strict(None) # type: ignore[arg-type]
def test_integer_input(self):
with pytest.raises(ValueError, match="cannot be empty"):
validate_custom_command_strict(123) # type: ignore[arg-type]
def test_python_dash_c_uppercase(self):
with pytest.raises(ValueError, match="python -c is not allowed"):
validate_custom_command_strict("python -C 'exec(evil)'")
def test_powershell_via_path(self):
with pytest.raises(ValueError, match="runner not allowed"):
validate_custom_command_strict("C:\\Windows\\System32\\powershell.exe -c Get-Process")
# =============================================================================
# dev_server_manager.py - dangerous_ops blocking
# =============================================================================
class TestDangerousOpsBlocking:
"""Test the metacharacter blocking in dev_server_manager.start()."""
@pytest.fixture
def manager(self, tmp_path):
from server.services.dev_server_manager import DevServerProcessManager
return DevServerProcessManager("test-project", tmp_path)
@pytest.mark.asyncio
@pytest.mark.parametrize("cmd,desc", [
("npm run dev && curl evil.com", "double ampersand"),
("npm run dev & curl evil.com", "single ampersand"),
("npm run dev || curl evil.com", "double pipe"),
("npm run dev | curl evil.com", "single pipe"),
("npm run dev ; curl evil.com", "semicolon"),
("npm run dev `curl evil.com`", "backtick"),
("npm run dev $(curl evil.com)", "dollar paren"),
("npm run dev > /etc/passwd", "output redirect"),
("npm run dev < /etc/passwd", "input redirect"),
("npm run dev ^& calc", "caret escape"),
("npm run %COMSPEC%", "percent env expansion"),
])
async def test_blocks_shell_operator(self, manager, cmd, desc):
success, message = await manager.start(cmd)
assert not success, f"Should block {desc}: {cmd}"
assert "not allowed" in message.lower()
@pytest.mark.asyncio
async def test_blocks_newline_injection(self, manager):
success, message = await manager.start("npm run dev\ncurl evil.com")
assert not success
assert "newline" in message.lower()
@pytest.mark.asyncio
async def test_blocks_carriage_return(self, manager):
success, message = await manager.start("npm run dev\r\ncurl evil.com")
assert not success
assert "newline" in message.lower()
@pytest.mark.asyncio
@pytest.mark.parametrize("shell", ["sh", "bash", "zsh", "cmd", "powershell", "pwsh"])
async def test_blocks_shell_runners(self, manager, shell):
success, message = await manager.start(f"{shell} -c 'echo hacked'")
assert not success
assert "not allowed" in message.lower()
@pytest.mark.asyncio
async def test_blocks_empty_command(self, manager):
success, message = await manager.start("")
assert not success
assert "empty" in message.lower()
@pytest.mark.asyncio
async def test_blocks_whitespace_command(self, manager):
success, message = await manager.start(" ")
assert not success
assert "empty" in message.lower()
# =============================================================================
# Constants validation
# =============================================================================
class TestConstants:
"""Verify security constants are properly defined."""
def test_all_common_shells_blocked(self):
for shell in ["sh", "bash", "zsh", "cmd", "powershell", "pwsh", "cmd.exe"]:
assert shell in BLOCKED_SHELLS, f"{shell} should be in BLOCKED_SHELLS"
def test_common_npm_scripts_allowed(self):
for script in ["dev", "start", "serve", "preview"]:
assert script in ALLOWED_NPM_SCRIPTS, f"{script} should be in ALLOWED_NPM_SCRIPTS"
def test_common_python_modules_allowed(self):
for mod in ["uvicorn", "flask", "gunicorn"]:
assert mod in ALLOWED_PYTHON_MODULES, f"{mod} should be in ALLOWED_PYTHON_MODULES"
def test_common_runners_allowed(self):
for runner in ["npm", "pnpm", "yarn", "python", "python3", "uvicorn", "flask", "cargo", "go"]:
assert runner in ALLOWED_RUNNERS, f"{runner} should be in ALLOWED_RUNNERS"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

6
ui/package-lock.json generated
View File

@@ -20,6 +20,7 @@
"@xterm/addon-web-links": "^0.12.0", "@xterm/addon-web-links": "^0.12.0",
"@xterm/xterm": "^6.0.0", "@xterm/xterm": "^6.0.0",
"@xyflow/react": "^12.10.0", "@xyflow/react": "^12.10.0",
"autoforge-ai": "file:..",
"canvas-confetti": "^1.9.4", "canvas-confetti": "^1.9.4",
"class-variance-authority": "^0.7.1", "class-variance-authority": "^0.7.1",
"clsx": "^2.1.1", "clsx": "^2.1.1",
@@ -53,7 +54,6 @@
"..": { "..": {
"name": "autoforge-ai", "name": "autoforge-ai",
"version": "0.1.0", "version": "0.1.0",
"extraneous": true,
"license": "AGPL-3.0", "license": "AGPL-3.0",
"bin": { "bin": {
"autoforge": "bin/autoforge.js" "autoforge": "bin/autoforge.js"
@@ -3148,6 +3148,10 @@
"node": ">=10" "node": ">=10"
} }
}, },
"node_modules/autoforge-ai": {
"resolved": "..",
"link": true
},
"node_modules/balanced-match": { "node_modules/balanced-match": {
"version": "1.0.2", "version": "1.0.2",
"resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz", "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.2.tgz",

View File

@@ -24,6 +24,7 @@
"@xterm/addon-web-links": "^0.12.0", "@xterm/addon-web-links": "^0.12.0",
"@xterm/xterm": "^6.0.0", "@xterm/xterm": "^6.0.0",
"@xyflow/react": "^12.10.0", "@xyflow/react": "^12.10.0",
"autoforge-ai": "file:..",
"canvas-confetti": "^1.9.4", "canvas-confetti": "^1.9.4",
"class-variance-authority": "^0.7.1", "class-variance-authority": "^0.7.1",
"clsx": "^2.1.1", "clsx": "^2.1.1",