mirror of
https://github.com/AutoMaker-Org/automaker.git
synced 2026-01-30 06:12:03 +00:00
Merge pull request #272 from AutoMaker-Org/fix/task-execution
feat: Implement throttling and retry logic in secure-fs module
This commit is contained in:
@@ -20,4 +20,9 @@ export const {
|
||||
lstat,
|
||||
joinPath,
|
||||
resolvePath,
|
||||
// Throttling configuration and monitoring
|
||||
configureThrottling,
|
||||
getThrottlingConfig,
|
||||
getPendingOperations,
|
||||
getActiveOperations,
|
||||
} = secureFs;
|
||||
|
||||
@@ -86,8 +86,9 @@ Binary file ${cleanPath} added
|
||||
`;
|
||||
}
|
||||
|
||||
if (stats.size > MAX_SYNTHETIC_DIFF_SIZE) {
|
||||
const sizeKB = Math.round(stats.size / 1024);
|
||||
const fileSize = Number(stats.size);
|
||||
if (fileSize > MAX_SYNTHETIC_DIFF_SIZE) {
|
||||
const sizeKB = Math.round(fileSize / 1024);
|
||||
return createNewFileDiff(cleanPath, '100644', [`[File too large to display: ${sizeKB}KB]`]);
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,8 @@
|
||||
"author": "AutoMaker Team",
|
||||
"license": "SEE LICENSE IN LICENSE",
|
||||
"dependencies": {
|
||||
"@automaker/types": "^1.0.0"
|
||||
"@automaker/types": "^1.0.0",
|
||||
"p-limit": "^6.2.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^22.10.5",
|
||||
|
||||
@@ -4,19 +4,149 @@
|
||||
* All file I/O operations must go through this adapter to enforce
|
||||
* ALLOWED_ROOT_DIRECTORY restrictions at the actual access point,
|
||||
* not just at the API layer. This provides defense-in-depth security.
|
||||
*
|
||||
* This module also implements:
|
||||
* - Concurrency limiting via p-limit to prevent ENFILE/EMFILE errors
|
||||
* - Retry logic with exponential backoff for transient file descriptor errors
|
||||
*/
|
||||
|
||||
import fs from 'fs/promises';
|
||||
import type { Dirent } from 'fs';
|
||||
import path from 'path';
|
||||
import pLimit from 'p-limit';
|
||||
import { validatePath } from './security.js';
|
||||
|
||||
/**
|
||||
* Configuration for file operation throttling
|
||||
*/
|
||||
interface ThrottleConfig {
|
||||
/** Maximum concurrent file operations (default: 100) */
|
||||
maxConcurrency: number;
|
||||
/** Maximum retry attempts for ENFILE/EMFILE errors (default: 3) */
|
||||
maxRetries: number;
|
||||
/** Base delay in ms for exponential backoff (default: 100) */
|
||||
baseDelay: number;
|
||||
/** Maximum delay in ms for exponential backoff (default: 5000) */
|
||||
maxDelay: number;
|
||||
}
|
||||
|
||||
const DEFAULT_CONFIG: ThrottleConfig = {
|
||||
maxConcurrency: 100,
|
||||
maxRetries: 3,
|
||||
baseDelay: 100,
|
||||
maxDelay: 5000,
|
||||
};
|
||||
|
||||
let config: ThrottleConfig = { ...DEFAULT_CONFIG };
|
||||
let fsLimit = pLimit(config.maxConcurrency);
|
||||
|
||||
/**
|
||||
* Configure the file operation throttling settings
|
||||
* @param newConfig - Partial configuration to merge with defaults
|
||||
*/
|
||||
export function configureThrottling(newConfig: Partial<ThrottleConfig>): void {
|
||||
const newConcurrency = newConfig.maxConcurrency;
|
||||
|
||||
if (newConcurrency !== undefined && newConcurrency !== config.maxConcurrency) {
|
||||
if (fsLimit.activeCount > 0 || fsLimit.pendingCount > 0) {
|
||||
throw new Error(
|
||||
`[SecureFS] Cannot change maxConcurrency while operations are in flight. Active: ${fsLimit.activeCount}, Pending: ${fsLimit.pendingCount}`
|
||||
);
|
||||
}
|
||||
fsLimit = pLimit(newConcurrency);
|
||||
}
|
||||
|
||||
config = { ...config, ...newConfig };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current throttling configuration
|
||||
*/
|
||||
export function getThrottlingConfig(): Readonly<ThrottleConfig> {
|
||||
return { ...config };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending operations in the queue
|
||||
*/
|
||||
export function getPendingOperations(): number {
|
||||
return fsLimit.pendingCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of active operations currently running
|
||||
*/
|
||||
export function getActiveOperations(): number {
|
||||
return fsLimit.activeCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Error codes that indicate file descriptor exhaustion
|
||||
*/
|
||||
const FILE_DESCRIPTOR_ERROR_CODES = new Set(['ENFILE', 'EMFILE']);
|
||||
|
||||
/**
|
||||
* Check if an error is a file descriptor exhaustion error
|
||||
*/
|
||||
function isFileDescriptorError(error: unknown): boolean {
|
||||
if (error && typeof error === 'object' && 'code' in error) {
|
||||
return FILE_DESCRIPTOR_ERROR_CODES.has((error as { code: string }).code);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate delay with exponential backoff and jitter
|
||||
*/
|
||||
function calculateDelay(attempt: number): number {
|
||||
const exponentialDelay = config.baseDelay * Math.pow(2, attempt);
|
||||
const jitter = Math.random() * config.baseDelay;
|
||||
return Math.min(exponentialDelay + jitter, config.maxDelay);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep for a specified duration
|
||||
*/
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a file operation with throttling and retry logic
|
||||
*/
|
||||
async function executeWithRetry<T>(operation: () => Promise<T>, operationName: string): Promise<T> {
|
||||
return fsLimit(async () => {
|
||||
let lastError: unknown;
|
||||
|
||||
for (let attempt = 0; attempt <= config.maxRetries; attempt++) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
|
||||
if (isFileDescriptorError(error) && attempt < config.maxRetries) {
|
||||
const delay = calculateDelay(attempt);
|
||||
console.warn(
|
||||
`[SecureFS] ${operationName}: File descriptor error (attempt ${attempt + 1}/${config.maxRetries + 1}), retrying in ${delay}ms`
|
||||
);
|
||||
await sleep(delay);
|
||||
continue;
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper around fs.access that validates path first
|
||||
*/
|
||||
export async function access(filePath: string, mode?: number): Promise<void> {
|
||||
const validatedPath = validatePath(filePath);
|
||||
return fs.access(validatedPath, mode);
|
||||
return executeWithRetry(() => fs.access(validatedPath, mode), `access(${filePath})`);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -27,10 +157,12 @@ export async function readFile(
|
||||
encoding?: BufferEncoding
|
||||
): Promise<string | Buffer> {
|
||||
const validatedPath = validatePath(filePath);
|
||||
if (encoding) {
|
||||
return fs.readFile(validatedPath, encoding);
|
||||
}
|
||||
return fs.readFile(validatedPath);
|
||||
return executeWithRetry<string | Buffer>(() => {
|
||||
if (encoding) {
|
||||
return fs.readFile(validatedPath, encoding);
|
||||
}
|
||||
return fs.readFile(validatedPath);
|
||||
}, `readFile(${filePath})`);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -42,7 +174,10 @@ export async function writeFile(
|
||||
encoding?: BufferEncoding
|
||||
): Promise<void> {
|
||||
const validatedPath = validatePath(filePath);
|
||||
return fs.writeFile(validatedPath, data, encoding);
|
||||
return executeWithRetry(
|
||||
() => fs.writeFile(validatedPath, data, encoding),
|
||||
`writeFile(${filePath})`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -53,7 +188,7 @@ export async function mkdir(
|
||||
options?: { recursive?: boolean; mode?: number }
|
||||
): Promise<string | undefined> {
|
||||
const validatedPath = validatePath(dirPath);
|
||||
return fs.mkdir(validatedPath, options);
|
||||
return executeWithRetry(() => fs.mkdir(validatedPath, options), `mkdir(${dirPath})`);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -72,18 +207,20 @@ export async function readdir(
|
||||
options?: { withFileTypes?: boolean; encoding?: BufferEncoding }
|
||||
): Promise<string[] | Dirent[]> {
|
||||
const validatedPath = validatePath(dirPath);
|
||||
if (options?.withFileTypes === true) {
|
||||
return fs.readdir(validatedPath, { withFileTypes: true });
|
||||
}
|
||||
return fs.readdir(validatedPath);
|
||||
return executeWithRetry<string[] | Dirent[]>(() => {
|
||||
if (options?.withFileTypes === true) {
|
||||
return fs.readdir(validatedPath, { withFileTypes: true });
|
||||
}
|
||||
return fs.readdir(validatedPath);
|
||||
}, `readdir(${dirPath})`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper around fs.stat that validates path first
|
||||
*/
|
||||
export async function stat(filePath: string): Promise<any> {
|
||||
export async function stat(filePath: string): Promise<ReturnType<typeof fs.stat>> {
|
||||
const validatedPath = validatePath(filePath);
|
||||
return fs.stat(validatedPath);
|
||||
return executeWithRetry(() => fs.stat(validatedPath), `stat(${filePath})`);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -94,7 +231,7 @@ export async function rm(
|
||||
options?: { recursive?: boolean; force?: boolean }
|
||||
): Promise<void> {
|
||||
const validatedPath = validatePath(filePath);
|
||||
return fs.rm(validatedPath, options);
|
||||
return executeWithRetry(() => fs.rm(validatedPath, options), `rm(${filePath})`);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -102,7 +239,7 @@ export async function rm(
|
||||
*/
|
||||
export async function unlink(filePath: string): Promise<void> {
|
||||
const validatedPath = validatePath(filePath);
|
||||
return fs.unlink(validatedPath);
|
||||
return executeWithRetry(() => fs.unlink(validatedPath), `unlink(${filePath})`);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -111,7 +248,10 @@ export async function unlink(filePath: string): Promise<void> {
|
||||
export async function copyFile(src: string, dest: string, mode?: number): Promise<void> {
|
||||
const validatedSrc = validatePath(src);
|
||||
const validatedDest = validatePath(dest);
|
||||
return fs.copyFile(validatedSrc, validatedDest, mode);
|
||||
return executeWithRetry(
|
||||
() => fs.copyFile(validatedSrc, validatedDest, mode),
|
||||
`copyFile(${src}, ${dest})`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -123,7 +263,10 @@ export async function appendFile(
|
||||
encoding?: BufferEncoding
|
||||
): Promise<void> {
|
||||
const validatedPath = validatePath(filePath);
|
||||
return fs.appendFile(validatedPath, data, encoding);
|
||||
return executeWithRetry(
|
||||
() => fs.appendFile(validatedPath, data, encoding),
|
||||
`appendFile(${filePath})`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -132,16 +275,19 @@ export async function appendFile(
|
||||
export async function rename(oldPath: string, newPath: string): Promise<void> {
|
||||
const validatedOldPath = validatePath(oldPath);
|
||||
const validatedNewPath = validatePath(newPath);
|
||||
return fs.rename(validatedOldPath, validatedNewPath);
|
||||
return executeWithRetry(
|
||||
() => fs.rename(validatedOldPath, validatedNewPath),
|
||||
`rename(${oldPath}, ${newPath})`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper around fs.lstat that validates path first
|
||||
* Returns file stats without following symbolic links
|
||||
*/
|
||||
export async function lstat(filePath: string): Promise<any> {
|
||||
export async function lstat(filePath: string): Promise<ReturnType<typeof fs.lstat>> {
|
||||
const validatedPath = validatePath(filePath);
|
||||
return fs.lstat(validatedPath);
|
||||
return executeWithRetry(() => fs.lstat(validatedPath), `lstat(${filePath})`);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -130,7 +130,7 @@ describe('node-finder', () => {
|
||||
const delimiter = path.delimiter;
|
||||
|
||||
it("should return current path unchanged when nodePath is 'node'", () => {
|
||||
const currentPath = '/usr/bin:/usr/local/bin';
|
||||
const currentPath = `/usr/bin${delimiter}/usr/local/bin`;
|
||||
const result = buildEnhancedPath('node', currentPath);
|
||||
|
||||
expect(result).toBe(currentPath);
|
||||
@@ -144,7 +144,7 @@ describe('node-finder', () => {
|
||||
|
||||
it('should prepend node directory to path', () => {
|
||||
const nodePath = '/opt/homebrew/bin/node';
|
||||
const currentPath = '/usr/bin:/usr/local/bin';
|
||||
const currentPath = `/usr/bin${delimiter}/usr/local/bin`;
|
||||
|
||||
const result = buildEnhancedPath(nodePath, currentPath);
|
||||
|
||||
@@ -153,7 +153,7 @@ describe('node-finder', () => {
|
||||
|
||||
it('should not duplicate node directory if already in path', () => {
|
||||
const nodePath = '/usr/local/bin/node';
|
||||
const currentPath = '/usr/local/bin:/usr/bin';
|
||||
const currentPath = `/usr/local/bin${delimiter}/usr/bin`;
|
||||
|
||||
const result = buildEnhancedPath(nodePath, currentPath);
|
||||
|
||||
|
||||
136
libs/platform/tests/secure-fs.test.ts
Normal file
136
libs/platform/tests/secure-fs.test.ts
Normal file
@@ -0,0 +1,136 @@
|
||||
/**
|
||||
* Unit tests for secure-fs throttling and retry logic
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach, vi } from 'vitest';
|
||||
import * as secureFs from '../src/secure-fs.js';
|
||||
|
||||
describe('secure-fs throttling', () => {
|
||||
beforeEach(() => {
|
||||
// Reset throttling configuration before each test
|
||||
secureFs.configureThrottling({
|
||||
maxConcurrency: 100,
|
||||
maxRetries: 3,
|
||||
baseDelay: 100,
|
||||
maxDelay: 5000,
|
||||
});
|
||||
});
|
||||
|
||||
describe('configureThrottling', () => {
|
||||
it('should update configuration with new values', () => {
|
||||
secureFs.configureThrottling({ maxConcurrency: 50 });
|
||||
const config = secureFs.getThrottlingConfig();
|
||||
expect(config.maxConcurrency).toBe(50);
|
||||
});
|
||||
|
||||
it('should preserve existing values when updating partial config', () => {
|
||||
secureFs.configureThrottling({ maxRetries: 5 });
|
||||
const config = secureFs.getThrottlingConfig();
|
||||
expect(config.maxConcurrency).toBe(100); // Default value preserved
|
||||
expect(config.maxRetries).toBe(5);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getThrottlingConfig', () => {
|
||||
it('should return current configuration', () => {
|
||||
const config = secureFs.getThrottlingConfig();
|
||||
expect(config).toHaveProperty('maxConcurrency');
|
||||
expect(config).toHaveProperty('maxRetries');
|
||||
expect(config).toHaveProperty('baseDelay');
|
||||
expect(config).toHaveProperty('maxDelay');
|
||||
});
|
||||
|
||||
it('should return default values initially', () => {
|
||||
const config = secureFs.getThrottlingConfig();
|
||||
expect(config.maxConcurrency).toBe(100);
|
||||
expect(config.maxRetries).toBe(3);
|
||||
expect(config.baseDelay).toBe(100);
|
||||
expect(config.maxDelay).toBe(5000);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getPendingOperations', () => {
|
||||
it('should return 0 when no operations are pending', () => {
|
||||
expect(secureFs.getPendingOperations()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getActiveOperations', () => {
|
||||
it('should return 0 when no operations are active', () => {
|
||||
expect(secureFs.getActiveOperations()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('concurrency limiting', () => {
|
||||
it('should apply maxConcurrency configuration', () => {
|
||||
secureFs.configureThrottling({ maxConcurrency: 2 });
|
||||
|
||||
// This test verifies that the configuration is applied.
|
||||
// A more robust integration test should verify the actual concurrency behavior
|
||||
// by observing getActiveOperations() and getPendingOperations() under load.
|
||||
expect(secureFs.getThrottlingConfig().maxConcurrency).toBe(2);
|
||||
});
|
||||
|
||||
it('should throw when changing maxConcurrency while operations are in flight', async () => {
|
||||
// We can't easily simulate in-flight operations without mocking,
|
||||
// but we can verify the check exists by testing when no ops are in flight
|
||||
expect(secureFs.getActiveOperations()).toBe(0);
|
||||
expect(secureFs.getPendingOperations()).toBe(0);
|
||||
|
||||
// Should not throw when no operations in flight
|
||||
expect(() => secureFs.configureThrottling({ maxConcurrency: 50 })).not.toThrow();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('file descriptor error handling', () => {
|
||||
it('should have retry configuration for file descriptor errors', () => {
|
||||
const config = secureFs.getThrottlingConfig();
|
||||
expect(config.maxRetries).toBe(3);
|
||||
expect(config.baseDelay).toBe(100);
|
||||
expect(config.maxDelay).toBe(5000);
|
||||
});
|
||||
|
||||
it('should allow configuring retry parameters', () => {
|
||||
secureFs.configureThrottling({ maxRetries: 5, baseDelay: 200 });
|
||||
const config = secureFs.getThrottlingConfig();
|
||||
expect(config.maxRetries).toBe(5);
|
||||
expect(config.baseDelay).toBe(200);
|
||||
});
|
||||
});
|
||||
|
||||
describe('retry logic behavior', () => {
|
||||
beforeEach(() => {
|
||||
secureFs.configureThrottling({
|
||||
maxConcurrency: 100,
|
||||
maxRetries: 3,
|
||||
baseDelay: 10, // Use short delays for tests
|
||||
maxDelay: 50,
|
||||
});
|
||||
});
|
||||
|
||||
// Note: Due to ESM module limitations, we cannot easily mock fs/promises directly.
|
||||
// These tests verify the configuration is correctly set up for retry behavior.
|
||||
// The actual retry logic is integration-tested when real file descriptor errors occur.
|
||||
|
||||
it('should have correct retry configuration for ENFILE/EMFILE errors', () => {
|
||||
const config = secureFs.getThrottlingConfig();
|
||||
expect(config.maxRetries).toBe(3);
|
||||
expect(config.baseDelay).toBe(10);
|
||||
expect(config.maxDelay).toBe(50);
|
||||
});
|
||||
|
||||
it('should expose operation counts for monitoring', () => {
|
||||
// These should be 0 when no operations are in flight
|
||||
expect(secureFs.getActiveOperations()).toBe(0);
|
||||
expect(secureFs.getPendingOperations()).toBe(0);
|
||||
});
|
||||
|
||||
it('should allow customizing retry behavior', () => {
|
||||
secureFs.configureThrottling({ maxRetries: 5, baseDelay: 200, maxDelay: 10000 });
|
||||
const config = secureFs.getThrottlingConfig();
|
||||
expect(config.maxRetries).toBe(5);
|
||||
expect(config.baseDelay).toBe(200);
|
||||
expect(config.maxDelay).toBe(10000);
|
||||
});
|
||||
});
|
||||
30
package-lock.json
generated
30
package-lock.json
generated
@@ -250,7 +250,8 @@
|
||||
"version": "1.0.0",
|
||||
"license": "SEE LICENSE IN LICENSE",
|
||||
"dependencies": {
|
||||
"@automaker/types": "^1.0.0"
|
||||
"@automaker/types": "^1.0.0",
|
||||
"p-limit": "^6.2.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^22.10.5",
|
||||
@@ -268,6 +269,33 @@
|
||||
"undici-types": "~6.21.0"
|
||||
}
|
||||
},
|
||||
"libs/platform/node_modules/p-limit": {
|
||||
"version": "6.2.0",
|
||||
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-6.2.0.tgz",
|
||||
"integrity": "sha512-kuUqqHNUqoIWp/c467RI4X6mmyuojY5jGutNU0wVTmEOOfcuwLqyMVoAi9MKi2Ak+5i9+nhmrK4ufZE8069kHA==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"yocto-queue": "^1.1.1"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=18"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/sindresorhus"
|
||||
}
|
||||
},
|
||||
"libs/platform/node_modules/yocto-queue": {
|
||||
"version": "1.2.2",
|
||||
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.2.2.tgz",
|
||||
"integrity": "sha512-4LCcse/U2MHZ63HAJVE+v71o7yOdIe4cZ70Wpf8D/IyjDKYQLV5GD46B+hSTjJsvV5PztjvHoU580EftxjDZFQ==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=12.20"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/sindresorhus"
|
||||
}
|
||||
},
|
||||
"libs/prompts": {
|
||||
"name": "@automaker/prompts",
|
||||
"version": "1.0.0",
|
||||
|
||||
Reference in New Issue
Block a user