Merge pull request #153 from syphonetic/master

Implemented RCE mitigation measures
This commit is contained in:
Leon van Zyl
2026-02-05 08:31:28 +02:00
committed by GitHub
2 changed files with 112 additions and 42 deletions

View File

@@ -8,6 +8,7 @@ Uses project registry for path lookups and project_config for command detection.
import logging import logging
import sys import sys
import shlex
from pathlib import Path from pathlib import Path
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
@@ -72,6 +73,64 @@ def get_project_dir(project_name: str) -> Path:
return project_dir return project_dir
ALLOWED_RUNNERS = {"npm", "pnpm", "yarn", "uvicorn", "python", "python3"}
def validate_custom_command_strict(cmd: str) -> None:
"""
Strict allowlist validation for dev server commands.
Prevents arbitrary command execution (no sh -c, no cmd /c, no python -c, etc.)
"""
if not isinstance(cmd, str) or not cmd.strip():
raise ValueError("custom_command cannot be empty")
argv = shlex.split(cmd, posix=(sys.platform != "win32"))
if not argv:
raise ValueError("custom_command could not be parsed")
base = Path(argv[0]).name.lower()
# Block direct shells / interpreters commonly used for command injection
if base in {"sh", "bash", "zsh", "cmd", "powershell", "pwsh"}:
raise ValueError(f"custom_command runner not allowed: {base}")
if base not in ALLOWED_RUNNERS:
raise ValueError(f"custom_command runner not allowed: {base}")
# Block one-liner execution
lowered = [a.lower() for a in argv]
if base in {"python", "python3"}:
if "-c" in lowered:
raise ValueError("python -c is not allowed")
# Only allow: python -m uvicorn ...
if len(argv) < 3 or argv[1:3] != ["-m", "uvicorn"]:
raise ValueError("Only 'python -m uvicorn ...' is allowed")
if base == "uvicorn":
if len(argv) < 2 or ":" not in argv[1]:
raise ValueError("uvicorn must specify an app like module:app")
allowed_flags = {"--host", "--port", "--reload", "--log-level", "--workers"}
i = 2
while i < len(argv):
a = argv[i]
if a.startswith("-") and a not in allowed_flags:
raise ValueError(f"uvicorn flag not allowed: {a}")
i += 1
if base in {"npm", "pnpm", "yarn"}:
# Allow only dev/start scripts (no arbitrary exec)
if base == "npm":
if len(argv) < 3 or argv[1] != "run" or argv[2] not in {"dev", "start"}:
raise ValueError("npm custom_command must be 'npm run dev' or 'npm run start'")
elif base == "pnpm":
ok = (len(argv) >= 2 and argv[1] in {"dev", "start"}) or (len(argv) >= 3 and argv[1] == "run" and argv[2] in {"dev", "start"})
if not ok:
raise ValueError("pnpm custom_command must be 'pnpm dev/start' or 'pnpm run dev/start'")
elif base == "yarn":
ok = (len(argv) >= 2 and argv[1] in {"dev", "start"}) or (len(argv) >= 3 and argv[1] == "run" and argv[2] in {"dev", "start"})
if not ok:
raise ValueError("yarn custom_command must be 'yarn dev/start' or 'yarn run dev/start'")
def get_project_devserver_manager(project_name: str): def get_project_devserver_manager(project_name: str):
""" """
@@ -180,9 +239,12 @@ async def start_devserver(
# Determine which command to use # Determine which command to use
command: str | None command: str | None
if request.command: if request.command:
command = request.command raise HTTPException(
else: status_code=400,
command = get_dev_command(project_dir) detail="Direct command execution is disabled. Use /config to set a safe custom_command."
)
command = get_dev_command(project_dir)
if not command: if not command:
raise HTTPException( raise HTTPException(
@@ -289,6 +351,7 @@ async def update_devserver_config(
# Set the custom command # Set the custom command
try: try:
validate_custom_command_strict(update.custom_command)
set_dev_command(project_dir, update.custom_command) set_dev_command(project_dir, update.custom_command)
except ValueError as e: except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))

View File

@@ -17,6 +17,7 @@ import re
import subprocess import subprocess
import sys import sys
import threading import threading
import shlex
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Awaitable, Callable, Literal, Set from typing import Awaitable, Callable, Literal, Set
@@ -291,53 +292,47 @@ class DevServerProcessManager:
Start the dev server as a subprocess. Start the dev server as a subprocess.
Args: Args:
command: The shell command to run (e.g., "npm run dev") command: The command to run (e.g., "npm run dev")
Returns: Returns:
Tuple of (success, message) Tuple of (success, message)
""" """
if self.status == "running": # Already running?
if self.process and self.status == "running":
return False, "Dev server is already running" return False, "Dev server is already running"
# Lock check (prevents double-start)
if not self._check_lock(): if not self._check_lock():
return False, "Another dev server instance is already running for this project" return False, "Dev server already running (lock file present)"
# Validate that project directory exists command = (command or "").strip()
if not self.project_dir.exists(): if not command:
return False, f"Project directory does not exist: {self.project_dir}" return False, "Empty dev server command"
# Defense-in-depth: validate command against security allowlist # SECURITY: block shell operators/metacharacters (defense-in-depth)
commands = extract_commands(command) dangerous_ops = ["&&", "||", ";", "|", "`", "$("]
if not commands: if any(op in command for op in dangerous_ops):
return False, "Could not parse command for security validation" return False, "Shell operators are not allowed in dev server command"
allowed_commands, blocked_commands = get_effective_commands(self.project_dir) # Parse into argv and execute without shell
for cmd in commands: argv = shlex.split(command, posix=(sys.platform != "win32"))
if cmd in blocked_commands: if not argv:
logger.warning("Blocked dev server command '%s' (in blocklist) for %s", cmd, self.project_name) return False, "Empty dev server command"
return False, f"Command '{cmd}' is blocked and cannot be used as a dev server command"
if not is_command_allowed(cmd, allowed_commands):
logger.warning("Rejected dev server command '%s' (not in allowlist) for %s", cmd, self.project_name)
return False, f"Command '{cmd}' is not in the allowed commands list"
self._command = command base = Path(argv[0]).name.lower()
self._detected_url = None # Reset URL detection
# Defense-in-depth: reject direct shells/interpreters commonly used for injection
if base in {"sh", "bash", "zsh", "cmd", "powershell", "pwsh"}:
return False, f"Shell runner '{base}' is not allowed for dev server commands"
# Windows: use .cmd shims for Node package managers
if sys.platform == "win32" and base in {"npm", "pnpm", "yarn", "npx"} and not argv[0].lower().endswith(".cmd"):
argv[0] = argv[0] + ".cmd"
try: try:
# Determine shell based on platform
if sys.platform == "win32":
# On Windows, use cmd.exe
shell_cmd = ["cmd", "/c", command]
else:
# On Unix-like systems, use sh
shell_cmd = ["sh", "-c", command]
# Start subprocess with piped stdout/stderr
# stdin=DEVNULL prevents interactive dev servers from blocking on stdin
# On Windows, use CREATE_NO_WINDOW to prevent console window from flashing
if sys.platform == "win32": if sys.platform == "win32":
self.process = subprocess.Popen( self.process = subprocess.Popen(
shell_cmd, argv,
stdin=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
@@ -346,23 +341,35 @@ class DevServerProcessManager:
) )
else: else:
self.process = subprocess.Popen( self.process = subprocess.Popen(
shell_cmd, argv,
stdin=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
cwd=str(self.project_dir), cwd=str(self.project_dir),
) )
self._create_lock() self._command = command
self.started_at = datetime.now() self.started_at = datetime.utcnow()
self.status = "running" self._detected_url = None
# Start output streaming task # Create lock once we have a PID
self._create_lock()
# Start output streaming
self.status = "running"
self._output_task = asyncio.create_task(self._stream_output()) self._output_task = asyncio.create_task(self._stream_output())
return True, f"Dev server started with PID {self.process.pid}" return True, "Dev server started"
except FileNotFoundError:
self.status = "stopped"
self.process = None
self._remove_lock()
return False, f"Command not found: {argv[0]}"
except Exception as e: except Exception as e:
logger.exception("Failed to start dev server") self.status = "stopped"
self.process = None
self._remove_lock()
return False, f"Failed to start dev server: {e}" return False, f"Failed to start dev server: {e}"
async def stop(self) -> tuple[bool, str]: async def stop(self) -> tuple[bool, str]: