252 lines
7.8 KiB
JavaScript
252 lines
7.8 KiB
JavaScript
import { v4 as uuidv4 } from 'uuid';
|
|
|
|
class AsyncOperationManager {
|
|
constructor() {
|
|
this.operations = new Map(); // Stores active operation state
|
|
this.completedOperations = new Map(); // Stores completed operations
|
|
this.maxCompletedOperations = 100; // Maximum number of completed operations to store
|
|
this.listeners = new Map(); // For potential future notifications
|
|
}
|
|
|
|
/**
|
|
* Adds an operation to be executed asynchronously.
|
|
* @param {Function} operationFn - The async function to execute (e.g., a Direct function).
|
|
* @param {Object} args - Arguments to pass to the operationFn.
|
|
* @param {Object} context - The MCP tool context { log, reportProgress, session }.
|
|
* @returns {string} The unique ID assigned to this operation.
|
|
*/
|
|
addOperation(operationFn, args, context) {
|
|
const operationId = `op-${uuidv4()}`;
|
|
const operation = {
|
|
id: operationId,
|
|
status: 'pending',
|
|
startTime: Date.now(),
|
|
endTime: null,
|
|
result: null,
|
|
error: null,
|
|
// Store necessary parts of context, especially log for background execution
|
|
log: context.log,
|
|
reportProgress: context.reportProgress, // Pass reportProgress through
|
|
session: context.session // Pass session through if needed by the operationFn
|
|
};
|
|
this.operations.set(operationId, operation);
|
|
this.log(operationId, 'info', `Operation added.`);
|
|
|
|
// Start execution in the background (don't await here)
|
|
this._runOperation(operationId, operationFn, args, context).catch((err) => {
|
|
// Catch unexpected errors during the async execution setup itself
|
|
this.log(
|
|
operationId,
|
|
'error',
|
|
`Critical error starting operation: ${err.message}`,
|
|
{ stack: err.stack }
|
|
);
|
|
operation.status = 'failed';
|
|
operation.error = {
|
|
code: 'MANAGER_EXECUTION_ERROR',
|
|
message: err.message
|
|
};
|
|
operation.endTime = Date.now();
|
|
|
|
// Move to completed operations
|
|
this._moveToCompleted(operationId);
|
|
});
|
|
|
|
return operationId;
|
|
}
|
|
|
|
/**
|
|
* Internal function to execute the operation.
|
|
* @param {string} operationId - The ID of the operation.
|
|
* @param {Function} operationFn - The async function to execute.
|
|
* @param {Object} args - Arguments for the function.
|
|
* @param {Object} context - The original MCP tool context.
|
|
*/
|
|
async _runOperation(operationId, operationFn, args, context) {
|
|
const operation = this.operations.get(operationId);
|
|
if (!operation) return; // Should not happen
|
|
|
|
operation.status = 'running';
|
|
this.log(operationId, 'info', `Operation running.`);
|
|
this.emit('statusChanged', { operationId, status: 'running' });
|
|
|
|
try {
|
|
// Pass the necessary context parts to the direct function
|
|
// The direct function needs to be adapted if it needs reportProgress
|
|
// We pass the original context's log, plus our wrapped reportProgress
|
|
const result = await operationFn(args, operation.log, {
|
|
reportProgress: (progress) =>
|
|
this._handleProgress(operationId, progress),
|
|
mcpLog: operation.log, // Pass log as mcpLog if direct fn expects it
|
|
session: operation.session
|
|
});
|
|
|
|
operation.status = result.success ? 'completed' : 'failed';
|
|
operation.result = result.success ? result.data : null;
|
|
operation.error = result.success ? null : result.error;
|
|
this.log(
|
|
operationId,
|
|
'info',
|
|
`Operation finished with status: ${operation.status}`
|
|
);
|
|
} catch (error) {
|
|
this.log(
|
|
operationId,
|
|
'error',
|
|
`Operation failed with error: ${error.message}`,
|
|
{ stack: error.stack }
|
|
);
|
|
operation.status = 'failed';
|
|
operation.error = {
|
|
code: 'OPERATION_EXECUTION_ERROR',
|
|
message: error.message
|
|
};
|
|
} finally {
|
|
operation.endTime = Date.now();
|
|
this.emit('statusChanged', {
|
|
operationId,
|
|
status: operation.status,
|
|
result: operation.result,
|
|
error: operation.error
|
|
});
|
|
|
|
// Move to completed operations if done or failed
|
|
if (operation.status === 'completed' || operation.status === 'failed') {
|
|
this._moveToCompleted(operationId);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Move an operation from active operations to completed operations history.
|
|
* @param {string} operationId - The ID of the operation to move.
|
|
* @private
|
|
*/
|
|
_moveToCompleted(operationId) {
|
|
const operation = this.operations.get(operationId);
|
|
if (!operation) return;
|
|
|
|
// Store only the necessary data in completed operations
|
|
const completedData = {
|
|
id: operation.id,
|
|
status: operation.status,
|
|
startTime: operation.startTime,
|
|
endTime: operation.endTime,
|
|
result: operation.result,
|
|
error: operation.error
|
|
};
|
|
|
|
this.completedOperations.set(operationId, completedData);
|
|
this.operations.delete(operationId);
|
|
|
|
// Trim completed operations if exceeding maximum
|
|
if (this.completedOperations.size > this.maxCompletedOperations) {
|
|
// Get the oldest operation (sorted by endTime)
|
|
const oldest = [...this.completedOperations.entries()].sort(
|
|
(a, b) => a[1].endTime - b[1].endTime
|
|
)[0];
|
|
|
|
if (oldest) {
|
|
this.completedOperations.delete(oldest[0]);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handles progress updates from the running operation and forwards them.
|
|
* @param {string} operationId - The ID of the operation reporting progress.
|
|
* @param {Object} progress - The progress object { progress, total? }.
|
|
*/
|
|
_handleProgress(operationId, progress) {
|
|
const operation = this.operations.get(operationId);
|
|
if (operation && operation.reportProgress) {
|
|
try {
|
|
// Use the reportProgress function captured from the original context
|
|
operation.reportProgress(progress);
|
|
this.log(
|
|
operationId,
|
|
'debug',
|
|
`Reported progress: ${JSON.stringify(progress)}`
|
|
);
|
|
} catch (err) {
|
|
this.log(
|
|
operationId,
|
|
'warn',
|
|
`Failed to report progress: ${err.message}`
|
|
);
|
|
// Don't stop the operation, just log the reporting failure
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Retrieves the status and result/error of an operation.
|
|
* @param {string} operationId - The ID of the operation.
|
|
* @returns {Object | null} The operation details or null if not found.
|
|
*/
|
|
getStatus(operationId) {
|
|
// First check active operations
|
|
const operation = this.operations.get(operationId);
|
|
if (operation) {
|
|
return {
|
|
id: operation.id,
|
|
status: operation.status,
|
|
startTime: operation.startTime,
|
|
endTime: operation.endTime,
|
|
result: operation.result,
|
|
error: operation.error
|
|
};
|
|
}
|
|
|
|
// Then check completed operations
|
|
const completedOperation = this.completedOperations.get(operationId);
|
|
if (completedOperation) {
|
|
return completedOperation;
|
|
}
|
|
|
|
// Operation not found in either active or completed
|
|
return {
|
|
error: {
|
|
code: 'OPERATION_NOT_FOUND',
|
|
message: `Operation ID ${operationId} not found. It may have been completed and removed from history, or the ID may be invalid.`
|
|
},
|
|
status: 'not_found'
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Internal logging helper to prefix logs with the operation ID.
|
|
* @param {string} operationId - The ID of the operation.
|
|
* @param {'info'|'warn'|'error'|'debug'} level - Log level.
|
|
* @param {string} message - Log message.
|
|
* @param {Object} [meta] - Additional metadata.
|
|
*/
|
|
log(operationId, level, message, meta = {}) {
|
|
const operation = this.operations.get(operationId);
|
|
// Use the logger instance associated with the operation if available, otherwise console
|
|
const logger = operation?.log || console;
|
|
const logFn = logger[level] || logger.log || console.log; // Fallback
|
|
logFn(`[AsyncOp ${operationId}] ${message}`, meta);
|
|
}
|
|
|
|
// --- Basic Event Emitter ---
|
|
on(eventName, listener) {
|
|
if (!this.listeners.has(eventName)) {
|
|
this.listeners.set(eventName, []);
|
|
}
|
|
this.listeners.get(eventName).push(listener);
|
|
}
|
|
|
|
emit(eventName, data) {
|
|
if (this.listeners.has(eventName)) {
|
|
this.listeners.get(eventName).forEach((listener) => listener(data));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Export a singleton instance
|
|
const asyncOperationManager = new AsyncOperationManager();
|
|
|
|
// Export the manager and potentially the class if needed elsewhere
|
|
export { asyncOperationManager, AsyncOperationManager };
|