""" Filesystem Router ================== API endpoints for browsing the filesystem for project folder selection. Provides cross-platform support for Windows, macOS, and Linux. """ import functools import logging import os import re import sys from pathlib import Path from fastapi import APIRouter, HTTPException, Query from security import SENSITIVE_DIRECTORIES # Module logger logger = logging.getLogger(__name__) from ..schemas import ( CreateDirectoryRequest, DirectoryEntry, DirectoryListResponse, DriveInfo, PathValidationResponse, ) router = APIRouter(prefix="/api/filesystem", tags=["filesystem"]) # ============================================================================= # Platform-Specific Blocked Paths # ============================================================================= # Windows blocked paths WINDOWS_BLOCKED = { "C:\\Windows", "C:\\Program Files", "C:\\Program Files (x86)", "C:\\ProgramData", "C:\\System Volume Information", "C:\\$Recycle.Bin", "C:\\Recovery", } # macOS blocked paths MACOS_BLOCKED = { "/System", "/Library", "/private", "/usr", "/bin", "/sbin", "/etc", "/var", "/Volumes", "/cores", "/opt", } # Linux blocked paths LINUX_BLOCKED = { "/etc", "/var", "/usr", "/bin", "/sbin", "/boot", "/proc", "/sys", "/dev", "/root", "/lib", "/lib64", "/run", "/tmp", "/opt", } # Universal blocked paths (relative to home directory). # Delegates to the canonical SENSITIVE_DIRECTORIES set in security.py so that # the filesystem browser and the EXTRA_READ_PATHS validator share one source of truth. UNIVERSAL_BLOCKED_RELATIVE = SENSITIVE_DIRECTORIES # Patterns for files that should not be shown HIDDEN_PATTERNS = [ r"^\.env", # .env files r".*\.key$", # Key files r".*\.pem$", # PEM files r".*credentials.*", # Credential files r".*secrets.*", # Secrets files ] @functools.lru_cache(maxsize=1) def get_blocked_paths() -> frozenset[Path]: """ Get the set of blocked paths for the current platform. Cached because the platform and home directory do not change at runtime, and this function is called once per directory entry in list_directory(). """ home = Path.home() blocked = set() # Add platform-specific blocked paths if sys.platform == "win32": for p in WINDOWS_BLOCKED: blocked.add(Path(p).resolve()) elif sys.platform == "darwin": for p in MACOS_BLOCKED: blocked.add(Path(p).resolve()) else: # Linux for p in LINUX_BLOCKED: blocked.add(Path(p).resolve()) # Add universal blocked paths (relative to home) for rel in UNIVERSAL_BLOCKED_RELATIVE: blocked.add((home / rel).resolve()) return frozenset(blocked) def is_path_blocked(path: Path) -> bool: """Check if a path is in the blocked list.""" try: resolved = path.resolve() except (OSError, ValueError): return True # Can't resolve = blocked blocked_paths = get_blocked_paths() # Check if path is exactly a blocked path or inside one for blocked in blocked_paths: try: resolved.relative_to(blocked) return True except ValueError: pass # Also check if blocked is inside path (for parent directories) if resolved == blocked: return True return False def is_hidden_file(path: Path) -> bool: """Check if a file/directory is hidden (cross-platform).""" name = path.name # Unix-style: starts with dot if name.startswith('.'): return True # Windows: check FILE_ATTRIBUTE_HIDDEN if sys.platform == "win32": try: import ctypes attrs = ctypes.windll.kernel32.GetFileAttributesW(str(path)) if attrs != -1 and (attrs & 0x02): # FILE_ATTRIBUTE_HIDDEN return True except Exception: pass return False def matches_blocked_pattern(name: str) -> bool: """Check if filename matches a blocked pattern.""" for pattern in HIDDEN_PATTERNS: if re.match(pattern, name, re.IGNORECASE): return True return False def is_unc_path(path_str: str) -> bool: """Check if path is a Windows UNC path (network share).""" return path_str.startswith("\\\\") or path_str.startswith("//") # ============================================================================= # Endpoints # ============================================================================= @router.get("/list", response_model=DirectoryListResponse) async def list_directory( path: str | None = Query(None, description="Directory path to list (defaults to home)"), show_hidden: bool = Query(False, description="Include hidden files"), ): """ List contents of a directory. Returns directories only (for folder selection). On Windows, includes available drives. """ # Default to home directory if path is None or path == "": target = Path.home() else: # Security: Block UNC paths if is_unc_path(path): logger.warning("Blocked UNC path access attempt: %s", path) raise HTTPException( status_code=403, detail="Network paths (UNC) are not allowed" ) target = Path(path) # Resolve symlinks and get absolute path try: target = target.resolve() except (OSError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Invalid path: {e}") # Security: Check if path is blocked if is_path_blocked(target): logger.warning("Blocked access to restricted path: %s", target) raise HTTPException( status_code=403, detail="Access to this directory is not allowed" ) # Check if path exists and is a directory if not target.exists(): raise HTTPException(status_code=404, detail="Directory not found") if not target.is_dir(): raise HTTPException(status_code=400, detail="Path is not a directory") # Check read permission if not os.access(target, os.R_OK): raise HTTPException(status_code=403, detail="No read permission") # List directory contents entries = [] try: for item in sorted(target.iterdir(), key=lambda x: x.name.lower()): # Skip if blocked pattern if matches_blocked_pattern(item.name): continue # Check if hidden hidden = is_hidden_file(item) if hidden and not show_hidden: continue # Security: Skip if item path is blocked if is_path_blocked(item): continue # Only include directories for folder browsing if item.is_dir(): try: # Check if directory has any subdirectories has_children = False try: for child in item.iterdir(): if child.is_dir() and not is_path_blocked(child): has_children = True break except (PermissionError, OSError): pass # Can't read = assume no children entries.append(DirectoryEntry( name=item.name, path=item.as_posix(), is_directory=True, is_hidden=hidden, size=None, has_children=has_children, )) except Exception: pass # Skip items we can't process except PermissionError: raise HTTPException(status_code=403, detail="Permission denied") except OSError as e: raise HTTPException(status_code=500, detail=f"Error reading directory: {e}") # Calculate parent path parent_path = None if target != target.parent: # Not at root parent = target.parent # Don't expose parent if it's blocked if not is_path_blocked(parent): parent_path = parent.as_posix() # Get drives on Windows drives = None if sys.platform == "win32": drives = get_windows_drives() return DirectoryListResponse( current_path=target.as_posix(), parent_path=parent_path, entries=entries, drives=drives, ) @router.get("/drives", response_model=list[DriveInfo] | None) async def list_drives(): """ List available drives (Windows only). Returns null on non-Windows platforms. """ if sys.platform != "win32": return None return get_windows_drives() def get_windows_drives() -> list[DriveInfo]: """Get list of available drives on Windows.""" drives = [] try: import ctypes import string # Get bitmask of available drives bitmask = ctypes.windll.kernel32.GetLogicalDrives() for i, letter in enumerate(string.ascii_uppercase): if bitmask & (1 << i): drive_path = f"{letter}:\\" try: # Try to get volume label volume_name = ctypes.create_unicode_buffer(1024) ctypes.windll.kernel32.GetVolumeInformationW( drive_path, volume_name, 1024, None, None, None, None, 0 ) label = volume_name.value or f"Local Disk ({letter}:)" except Exception: label = f"Drive ({letter}:)" # Check if drive is accessible available = os.path.exists(drive_path) drives.append(DriveInfo( letter=letter, label=label, available=available, )) except Exception: # Fallback: just list C: drive drives.append(DriveInfo(letter="C", label="Local Disk (C:)", available=True)) return drives @router.post("/validate", response_model=PathValidationResponse) async def validate_path(path: str = Query(..., description="Path to validate")): """ Validate if a path is accessible and writable. Used to check a path before creating a project there. """ # Security: Block UNC paths if is_unc_path(path): return PathValidationResponse( valid=False, exists=False, is_directory=False, can_read=False, can_write=False, message="Network paths (UNC) are not allowed", ) try: target = Path(path).resolve() except (OSError, ValueError) as e: return PathValidationResponse( valid=False, exists=False, is_directory=False, can_read=False, can_write=False, message=f"Invalid path: {e}", ) # Security: Check if blocked if is_path_blocked(target): return PathValidationResponse( valid=False, exists=target.exists(), is_directory=target.is_dir() if target.exists() else False, can_read=False, can_write=False, message="Access to this directory is not allowed", ) exists = target.exists() is_dir = target.is_dir() if exists else False can_read = os.access(target, os.R_OK) if exists else False can_write = os.access(target, os.W_OK) if exists else False # For non-existent paths, check if parent is writable if not exists: parent = target.parent parent_exists = parent.exists() parent_writable = os.access(parent, os.W_OK) if parent_exists else False can_write = parent_writable valid = is_dir and can_read and can_write if exists else can_write message = "" if not exists: message = "Directory does not exist (will be created)" elif not is_dir: message = "Path is not a directory" elif not can_read: message = "No read permission" elif not can_write: message = "No write permission" return PathValidationResponse( valid=valid, exists=exists, is_directory=is_dir, can_read=can_read, can_write=can_write, message=message, ) @router.post("/create-directory") async def create_directory(request: CreateDirectoryRequest): """ Create a new directory inside a parent directory. Used for creating project folders from the folder browser. """ # Validate directory name name = request.name.strip() if not name: raise HTTPException(status_code=400, detail="Directory name cannot be empty") # Security: Block special directory names that could enable traversal if name in ('.', '..') or '..' in name: raise HTTPException( status_code=400, detail="Invalid directory name" ) # Security: Check for invalid characters invalid_chars = '<>:"/\\|?*' if sys.platform == "win32" else '/' if any(c in name for c in invalid_chars): raise HTTPException( status_code=400, detail="Directory name contains invalid characters" ) # Security: Block UNC paths if is_unc_path(request.parent_path): raise HTTPException(status_code=403, detail="Network paths are not allowed") try: parent = Path(request.parent_path).resolve() except (OSError, ValueError) as e: raise HTTPException(status_code=400, detail=f"Invalid parent path: {e}") # Security: Check if parent is blocked if is_path_blocked(parent): raise HTTPException( status_code=403, detail="Cannot create directory in this location" ) # Check parent exists and is writable if not parent.exists(): raise HTTPException(status_code=404, detail="Parent directory not found") if not parent.is_dir(): raise HTTPException(status_code=400, detail="Parent path is not a directory") if not os.access(parent, os.W_OK): raise HTTPException(status_code=403, detail="No write permission") # Create the new directory new_dir = parent / name if new_dir.exists(): raise HTTPException(status_code=409, detail="Directory already exists") try: new_dir.mkdir(parents=False, exist_ok=False) logger.info("Created directory: %s", new_dir) except OSError as e: logger.error("Failed to create directory %s: %s", new_dir, e) raise HTTPException(status_code=500, detail=f"Failed to create directory: {e}") return { "success": True, "path": new_dir.as_posix(), "message": f"Created directory: {name}", } @router.get("/home") async def get_home_directory(): """Get the user's home directory path.""" home = Path.home() return { "path": home.as_posix(), "display_path": str(home), }