Config Structure Changes and Gateway Integration ## Configuration Structure Changes - Restructured .taskmasterconfig to use 'account' section for user settings - Moved userId, userEmail, mode, telemetryEnabled from global to account section - API keys remain isolated in .env file (not accessible to AI) - Enhanced getUserId() to always return value, never null (sets default '1234567890') ## Gateway Integration Enhancements - Updated registerUserWithGateway() to accept both email and userId parameters - Enhanced /auth/init endpoint integration for existing user validation - API key updates automatically written to .env during registration process - Improved user identification and validation flow ## Code Updates for New Structure - Fixed config-manager.js getter functions for account section access - Updated user-management.js to use config.account.userId/mode - Modified telemetry-submission.js to read from account section - Added getTelemetryEnabled() function with proper account section access - Enhanced telemetry configuration reading with new structure ## Comprehensive Test Updates - Updated integration tests (init-config.test.js) for new config structure - Fixed unit tests (config-manager.test.js) with updated default config - Updated telemetry tests (telemetry-submission.test.js) for account structure - Added missing getTelemetryEnabled mock to ai-services-unified.test.js - Fixed all test expectations to use config.account.* instead of config.global.* - Removed references to deprecated config.subscription object ## Configuration Access Consistency - Standardized configuration access patterns across entire codebase - Clean separation: user settings in account, API keys in .env, models/global in respective sections - All tests passing with new configuration structure - Maintained backward compatibility during transition Changes support enhanced telemetry system with proper user management and gateway integration while maintaining security through API key isolation.
385 lines
10 KiB
JavaScript
385 lines
10 KiB
JavaScript
import fs from "fs";
|
|
import path from "path";
|
|
import { submitTelemetryData } from "./telemetry-submission.js";
|
|
import { getDebugFlag } from "./config-manager.js";
|
|
import { log } from "./utils.js";
|
|
|
|
class TelemetryQueue {
|
|
constructor() {
|
|
this.queue = [];
|
|
this.processing = false;
|
|
this.backgroundInterval = null;
|
|
this.stats = {
|
|
pending: 0,
|
|
processed: 0,
|
|
failed: 0,
|
|
lastProcessedAt: null,
|
|
};
|
|
this.logFile = null;
|
|
}
|
|
|
|
/**
|
|
* Initialize the queue with comprehensive logging file path
|
|
* @param {string} projectRoot - Project root directory for log file
|
|
*/
|
|
initialize(projectRoot) {
|
|
if (projectRoot) {
|
|
this.logFile = path.join(projectRoot, ".taskmaster-activity.log");
|
|
this.loadPersistedQueue();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Add telemetry data to queue without blocking
|
|
* @param {Object} telemetryData - Command telemetry data
|
|
*/
|
|
addToQueue(telemetryData) {
|
|
const queueItem = {
|
|
...telemetryData,
|
|
queuedAt: new Date().toISOString(),
|
|
attempts: 0,
|
|
};
|
|
|
|
this.queue.push(queueItem);
|
|
this.stats.pending = this.queue.length;
|
|
|
|
// Log the activity immediately to .log file
|
|
this.logActivity("QUEUED", {
|
|
commandName: telemetryData.commandName,
|
|
queuedAt: queueItem.queuedAt,
|
|
userId: telemetryData.userId,
|
|
success: telemetryData.success,
|
|
executionTimeMs: telemetryData.executionTimeMs,
|
|
});
|
|
|
|
if (getDebugFlag()) {
|
|
log("debug", `Added ${telemetryData.commandName} to telemetry queue`);
|
|
}
|
|
|
|
// Persist queue state if file is configured
|
|
this.persistQueue();
|
|
}
|
|
|
|
/**
|
|
* Log activity to comprehensive .log file
|
|
* @param {string} action - The action being logged (QUEUED, SUBMITTED, FAILED, etc.)
|
|
* @param {Object} data - The data to log
|
|
*/
|
|
logActivity(action, data) {
|
|
if (!this.logFile) return;
|
|
|
|
try {
|
|
const timestamp = new Date().toISOString();
|
|
const logEntry = `${timestamp} [${action}] ${JSON.stringify(data)}\n`;
|
|
|
|
fs.appendFileSync(this.logFile, logEntry);
|
|
} catch (error) {
|
|
if (getDebugFlag()) {
|
|
log("error", `Failed to write to activity log: ${error.message}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process all queued telemetry items
|
|
* @returns {Object} Processing result with stats
|
|
*/
|
|
async processQueue() {
|
|
if (this.processing || this.queue.length === 0) {
|
|
return { processed: 0, failed: 0, errors: [] };
|
|
}
|
|
|
|
this.processing = true;
|
|
const errors = [];
|
|
let processed = 0;
|
|
let failed = 0;
|
|
|
|
this.logActivity("PROCESSING_START", { queueSize: this.queue.length });
|
|
|
|
// Process items in batches to avoid overwhelming the gateway
|
|
const batchSize = 5;
|
|
const itemsToProcess = [...this.queue];
|
|
|
|
for (let i = 0; i < itemsToProcess.length; i += batchSize) {
|
|
const batch = itemsToProcess.slice(i, i + batchSize);
|
|
|
|
for (const item of batch) {
|
|
try {
|
|
item.attempts++;
|
|
const result = await submitTelemetryData(item);
|
|
|
|
if (result.success) {
|
|
// Remove from queue on success
|
|
const index = this.queue.findIndex(
|
|
(q) => q.queuedAt === item.queuedAt
|
|
);
|
|
if (index > -1) {
|
|
this.queue.splice(index, 1);
|
|
}
|
|
processed++;
|
|
|
|
// Log successful submission
|
|
this.logActivity("SUBMITTED", {
|
|
commandName: item.commandName,
|
|
queuedAt: item.queuedAt,
|
|
attempts: item.attempts,
|
|
});
|
|
} else {
|
|
// Retry failed items up to 3 times
|
|
if (item.attempts >= 3) {
|
|
const index = this.queue.findIndex(
|
|
(q) => q.queuedAt === item.queuedAt
|
|
);
|
|
if (index > -1) {
|
|
this.queue.splice(index, 1);
|
|
}
|
|
failed++;
|
|
const errorMsg = `Failed to submit ${item.commandName} after 3 attempts: ${result.error}`;
|
|
errors.push(errorMsg);
|
|
|
|
// Log final failure
|
|
this.logActivity("FAILED", {
|
|
commandName: item.commandName,
|
|
queuedAt: item.queuedAt,
|
|
attempts: item.attempts,
|
|
error: result.error,
|
|
});
|
|
} else {
|
|
// Log retry attempt
|
|
this.logActivity("RETRY", {
|
|
commandName: item.commandName,
|
|
queuedAt: item.queuedAt,
|
|
attempts: item.attempts,
|
|
error: result.error,
|
|
});
|
|
}
|
|
}
|
|
} catch (error) {
|
|
// Network or unexpected errors
|
|
if (item.attempts >= 3) {
|
|
const index = this.queue.findIndex(
|
|
(q) => q.queuedAt === item.queuedAt
|
|
);
|
|
if (index > -1) {
|
|
this.queue.splice(index, 1);
|
|
}
|
|
failed++;
|
|
const errorMsg = `Exception submitting ${item.commandName}: ${error.message}`;
|
|
errors.push(errorMsg);
|
|
|
|
// Log exception failure
|
|
this.logActivity("EXCEPTION", {
|
|
commandName: item.commandName,
|
|
queuedAt: item.queuedAt,
|
|
attempts: item.attempts,
|
|
error: error.message,
|
|
});
|
|
} else {
|
|
// Log retry for exception
|
|
this.logActivity("RETRY_EXCEPTION", {
|
|
commandName: item.commandName,
|
|
queuedAt: item.queuedAt,
|
|
attempts: item.attempts,
|
|
error: error.message,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
// Small delay between batches
|
|
if (i + batchSize < itemsToProcess.length) {
|
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
|
}
|
|
}
|
|
|
|
this.stats.pending = this.queue.length;
|
|
this.stats.processed += processed;
|
|
this.stats.failed += failed;
|
|
this.stats.lastProcessedAt = new Date().toISOString();
|
|
|
|
this.processing = false;
|
|
this.persistQueue();
|
|
|
|
// Log processing completion
|
|
this.logActivity("PROCESSING_COMPLETE", {
|
|
processed,
|
|
failed,
|
|
remainingInQueue: this.queue.length,
|
|
});
|
|
|
|
if (getDebugFlag() && (processed > 0 || failed > 0)) {
|
|
log(
|
|
"debug",
|
|
`Telemetry queue processed: ${processed} success, ${failed} failed`
|
|
);
|
|
}
|
|
|
|
return { processed, failed, errors };
|
|
}
|
|
|
|
/**
|
|
* Start background processing at specified interval
|
|
* @param {number} intervalMs - Processing interval in milliseconds (default: 30000)
|
|
*/
|
|
startBackgroundProcessor(intervalMs = 30000) {
|
|
if (this.backgroundInterval) {
|
|
clearInterval(this.backgroundInterval);
|
|
}
|
|
|
|
this.backgroundInterval = setInterval(async () => {
|
|
try {
|
|
await this.processQueue();
|
|
} catch (error) {
|
|
if (getDebugFlag()) {
|
|
log(
|
|
"error",
|
|
`Background telemetry processing error: ${error.message}`
|
|
);
|
|
}
|
|
}
|
|
}, intervalMs);
|
|
|
|
if (getDebugFlag()) {
|
|
log(
|
|
"debug",
|
|
`Started telemetry background processor (${intervalMs}ms interval)`
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop background processing
|
|
*/
|
|
stopBackgroundProcessor() {
|
|
if (this.backgroundInterval) {
|
|
clearInterval(this.backgroundInterval);
|
|
this.backgroundInterval = null;
|
|
|
|
if (getDebugFlag()) {
|
|
log("debug", "Stopped telemetry background processor");
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get queue statistics
|
|
* @returns {Object} Queue stats
|
|
*/
|
|
getQueueStats() {
|
|
return {
|
|
...this.stats,
|
|
pending: this.queue.length,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Load persisted queue from file (now reads from .log file)
|
|
*/
|
|
loadPersistedQueue() {
|
|
// For the .log file, we'll look for a companion .json file for queue state
|
|
if (!this.logFile) return;
|
|
|
|
const stateFile = this.logFile.replace(".log", "-queue-state.json");
|
|
if (!fs.existsSync(stateFile)) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const data = fs.readFileSync(stateFile, "utf8");
|
|
const persistedData = JSON.parse(data);
|
|
|
|
this.queue = persistedData.queue || [];
|
|
this.stats = { ...this.stats, ...persistedData.stats };
|
|
|
|
if (getDebugFlag()) {
|
|
log(
|
|
"debug",
|
|
`Loaded ${this.queue.length} items from telemetry queue state`
|
|
);
|
|
}
|
|
} catch (error) {
|
|
if (getDebugFlag()) {
|
|
log(
|
|
"error",
|
|
`Failed to load persisted telemetry queue: ${error.message}`
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Persist queue state to companion file
|
|
*/
|
|
persistQueue() {
|
|
if (!this.logFile) return;
|
|
|
|
const stateFile = this.logFile.replace(".log", "-queue-state.json");
|
|
|
|
try {
|
|
const data = {
|
|
queue: this.queue,
|
|
stats: this.stats,
|
|
lastUpdated: new Date().toISOString(),
|
|
};
|
|
|
|
fs.writeFileSync(stateFile, JSON.stringify(data, null, 2));
|
|
} catch (error) {
|
|
if (getDebugFlag()) {
|
|
log("error", `Failed to persist telemetry queue: ${error.message}`);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Global instance
|
|
const telemetryQueue = new TelemetryQueue();
|
|
|
|
/**
|
|
* Add command telemetry to queue (non-blocking)
|
|
* @param {Object} commandData - Command execution data
|
|
*/
|
|
export function queueCommandTelemetry(commandData) {
|
|
telemetryQueue.addToQueue(commandData);
|
|
}
|
|
|
|
/**
|
|
* Initialize telemetry queue with project root
|
|
* @param {string} projectRoot - Project root directory
|
|
*/
|
|
export function initializeTelemetryQueue(projectRoot) {
|
|
telemetryQueue.initialize(projectRoot);
|
|
}
|
|
|
|
/**
|
|
* Start background telemetry processing
|
|
* @param {number} intervalMs - Processing interval in milliseconds
|
|
*/
|
|
export function startTelemetryBackgroundProcessor(intervalMs = 30000) {
|
|
telemetryQueue.startBackgroundProcessor(intervalMs);
|
|
}
|
|
|
|
/**
|
|
* Stop background telemetry processing
|
|
*/
|
|
export function stopTelemetryBackgroundProcessor() {
|
|
telemetryQueue.stopBackgroundProcessor();
|
|
}
|
|
|
|
/**
|
|
* Get telemetry queue statistics
|
|
* @returns {Object} Queue statistics
|
|
*/
|
|
export function getTelemetryQueueStats() {
|
|
return telemetryQueue.getQueueStats();
|
|
}
|
|
|
|
/**
|
|
* Manually process telemetry queue
|
|
* @returns {Object} Processing result
|
|
*/
|
|
export function processTelemetryQueue() {
|
|
return telemetryQueue.processQueue();
|
|
}
|
|
|
|
export { telemetryQueue };
|