feat(validator): detect broken/malformed workflow connections (#620)

Add comprehensive connection validation: unknown output keys with fix
suggestions, invalid type field detection, output/input index bounds
checking, and BFS-based trigger reachability analysis replacing simple
orphan detection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
czlonkowski
2026-03-07 23:18:49 +01:00
parent 0998e5486e
commit bc1c00cc2e
9 changed files with 1250 additions and 171 deletions

View File

@@ -11,13 +11,28 @@ import { ExpressionFormatValidator } from './expression-format-validator';
import { NodeSimilarityService, NodeSuggestion } from './node-similarity-service';
import { NodeTypeNormalizer } from '../utils/node-type-normalizer';
import { Logger } from '../utils/logger';
import { validateAISpecificNodes, hasAINodes } from './ai-node-validator';
import { validateAISpecificNodes, hasAINodes, AI_CONNECTION_TYPES } from './ai-node-validator';
import { isAIToolSubNode } from './ai-tool-validators';
import { isTriggerNode } from '../utils/node-type-utils';
import { isNonExecutableNode } from '../utils/node-classification';
import { ToolVariantGenerator } from './tool-variant-generator';
const logger = new Logger({ prefix: '[WorkflowValidator]' });
/**
* All valid connection output keys in n8n workflows.
* Any key not in this set is malformed and should be flagged.
*/
const VALID_CONNECTION_TYPES = new Set<string>([
'main',
'error',
...AI_CONNECTION_TYPES,
// Additional AI types from n8n-workflow NodeConnectionTypes not in AI_CONNECTION_TYPES
'ai_agent',
'ai_chain',
'ai_retriever',
'ai_reranker',
]);
interface WorkflowNode {
id: string;
name: string;
@@ -40,9 +55,7 @@ interface WorkflowNode {
interface WorkflowConnection {
[sourceNode: string]: {
main?: Array<Array<{ node: string; type: string; index: number }>>;
error?: Array<Array<{ node: string; type: string; index: number }>>;
ai_tool?: Array<Array<{ node: string; type: string; index: number }>>;
[outputType: string]: Array<Array<{ node: string; type: string; index: number }>>;
};
}
@@ -612,86 +625,47 @@ export class WorkflowValidator {
continue;
}
// Check main outputs
if (outputs.main) {
this.validateConnectionOutputs(
sourceName,
outputs.main,
nodeMap,
nodeIdMap,
result,
'main'
);
}
// Detect unknown output keys and validate known ones
for (const [outputKey, outputConnections] of Object.entries(outputs)) {
if (!VALID_CONNECTION_TYPES.has(outputKey)) {
// Flag unknown connection output key
let suggestion = '';
if (/^\d+$/.test(outputKey)) {
suggestion = ` If you meant to use output index ${outputKey}, use main[${outputKey}] instead.`;
}
result.errors.push({
type: 'error',
nodeName: sourceName,
message: `Unknown connection output key "${outputKey}" on node "${sourceName}". Valid keys are: ${[...VALID_CONNECTION_TYPES].join(', ')}.${suggestion}`,
code: 'UNKNOWN_CONNECTION_KEY'
});
result.statistics.invalidConnections++;
continue;
}
// Check error outputs
if (outputs.error) {
this.validateConnectionOutputs(
sourceName,
outputs.error,
nodeMap,
nodeIdMap,
result,
'error'
);
}
if (!outputConnections || !Array.isArray(outputConnections)) continue;
// Check AI tool outputs
if (outputs.ai_tool) {
// Validate that the source node can actually output ai_tool
this.validateAIToolSource(sourceNode, result);
if (outputKey === 'ai_tool') {
this.validateAIToolSource(sourceNode, result);
}
this.validateConnectionOutputs(
sourceName,
outputs.ai_tool,
outputConnections,
nodeMap,
nodeIdMap,
result,
'ai_tool'
outputKey
);
}
}
// Check for orphaned nodes (not connected and not triggers)
const connectedNodes = new Set<string>();
// Add all source nodes
Object.keys(workflow.connections).forEach(name => connectedNodes.add(name));
// Add all target nodes
Object.values(workflow.connections).forEach(outputs => {
if (outputs.main) {
outputs.main.flat().forEach(conn => {
if (conn) connectedNodes.add(conn.node);
});
}
if (outputs.error) {
outputs.error.flat().forEach(conn => {
if (conn) connectedNodes.add(conn.node);
});
}
if (outputs.ai_tool) {
outputs.ai_tool.flat().forEach(conn => {
if (conn) connectedNodes.add(conn.node);
});
}
});
// Check for orphaned nodes (exclude sticky notes)
for (const node of workflow.nodes) {
if (node.disabled || isNonExecutableNode(node.type)) continue;
// Use shared trigger detection function for consistency
const isNodeTrigger = isTriggerNode(node.type);
if (!connectedNodes.has(node.name) && !isNodeTrigger) {
result.warnings.push({
type: 'warning',
nodeId: node.id,
nodeName: node.name,
message: 'Node is not connected to any other nodes'
});
}
// Trigger reachability analysis: BFS from all triggers to find unreachable nodes
if (profile !== 'minimal') {
this.validateTriggerReachability(workflow, result);
} else {
this.flagOrphanedNodes(workflow, result);
}
// Check for cycles (skip in minimal profile to reduce false positives)
@@ -712,19 +686,20 @@ export class WorkflowValidator {
nodeMap: Map<string, WorkflowNode>,
nodeIdMap: Map<string, WorkflowNode>,
result: WorkflowValidationResult,
outputType: 'main' | 'error' | 'ai_tool'
outputType: string
): void {
// Get source node for special validation
const sourceNode = nodeMap.get(sourceName);
// Special validation for main outputs with error handling
// Main-output-specific validation: error handling config and index bounds
if (outputType === 'main' && sourceNode) {
this.validateErrorOutputConfiguration(sourceName, sourceNode, outputs, nodeMap, result);
this.validateOutputIndexBounds(sourceNode, outputs, result);
}
outputs.forEach((outputConnections, outputIndex) => {
if (!outputConnections) return;
outputConnections.forEach(connection => {
// Check for negative index
if (connection.index < 0) {
@@ -736,6 +711,22 @@ export class WorkflowValidator {
return;
}
// Validate connection type field
if (connection.type && !VALID_CONNECTION_TYPES.has(connection.type)) {
let suggestion = '';
if (/^\d+$/.test(connection.type)) {
suggestion = ` Numeric types are not valid - use "main", "error", or an AI connection type.`;
}
result.errors.push({
type: 'error',
nodeName: sourceName,
message: `Invalid connection type "${connection.type}" in connection from "${sourceName}" to "${connection.node}". Expected "main", "error", or an AI connection type (ai_tool, ai_languageModel, etc.).${suggestion}`,
code: 'INVALID_CONNECTION_TYPE'
});
result.statistics.invalidConnections++;
return;
}
// Special validation for SplitInBatches node
// Check both full form (n8n-nodes-base.*) and short form (nodes-base.*)
const isSplitInBatches = sourceNode && (
@@ -789,11 +780,16 @@ export class WorkflowValidator {
});
} else {
result.statistics.validConnections++;
// Additional validation for AI tool connections
if (outputType === 'ai_tool') {
this.validateAIToolConnection(sourceName, targetNode, result);
}
// Input index bounds checking
if (outputType === 'main') {
this.validateInputIndexBounds(sourceName, targetNode, connection, result);
}
}
});
});
@@ -991,6 +987,221 @@ export class WorkflowValidator {
});
}
/**
* Validate that output indices don't exceed what the node type supports.
*/
private validateOutputIndexBounds(
sourceNode: WorkflowNode,
outputs: Array<Array<{ node: string; type: string; index: number }>>,
result: WorkflowValidationResult
): void {
const normalizedType = NodeTypeNormalizer.normalizeToFullForm(sourceNode.type);
const nodeInfo = this.nodeRepository.getNode(normalizedType);
if (!nodeInfo || !nodeInfo.outputs) return;
// Count main outputs from node description
let mainOutputCount: number;
if (Array.isArray(nodeInfo.outputs)) {
// outputs can be strings like "main" or objects with { type: "main" }
mainOutputCount = nodeInfo.outputs.filter((o: any) =>
typeof o === 'string' ? o === 'main' : (o.type === 'main' || !o.type)
).length;
} else {
return; // Dynamic outputs (expression string), skip check
}
if (mainOutputCount === 0) return;
// Account for dynamic output counts based on node type and parameters
const shortType = normalizedType.replace(/^(n8n-)?nodes-base\./, '');
if (shortType === 'switch') {
// Switch node: output count depends on rules configuration
const rules = sourceNode.parameters?.rules?.values || sourceNode.parameters?.rules;
if (Array.isArray(rules)) {
mainOutputCount = rules.length + 1; // rules + fallback
} else {
return; // Cannot determine dynamic output count, skip bounds check
}
}
if (shortType === 'if' || shortType === 'filter') {
mainOutputCount = 2; // true/false
}
// Account for continueErrorOutput adding an extra output
if (sourceNode.onError === 'continueErrorOutput') {
mainOutputCount += 1;
}
// Check if any output index exceeds bounds
const maxOutputIndex = outputs.length - 1;
if (maxOutputIndex >= mainOutputCount) {
// Only flag if there are actual connections at the out-of-bounds indices
for (let i = mainOutputCount; i < outputs.length; i++) {
if (outputs[i] && outputs[i].length > 0) {
result.errors.push({
type: 'error',
nodeId: sourceNode.id,
nodeName: sourceNode.name,
message: `Output index ${i} on node "${sourceNode.name}" exceeds its output count (${mainOutputCount}). ` +
`This node has ${mainOutputCount} main output(s) (indices 0-${mainOutputCount - 1}).`,
code: 'OUTPUT_INDEX_OUT_OF_BOUNDS'
});
result.statistics.invalidConnections++;
}
}
}
}
/**
* Validate that input index doesn't exceed what the target node accepts.
*/
private validateInputIndexBounds(
sourceName: string,
targetNode: WorkflowNode,
connection: { node: string; type: string; index: number },
result: WorkflowValidationResult
): void {
const normalizedType = NodeTypeNormalizer.normalizeToFullForm(targetNode.type);
const nodeInfo = this.nodeRepository.getNode(normalizedType);
if (!nodeInfo) return;
// Most nodes have 1 main input. Known exceptions:
const shortType = normalizedType.replace(/^(n8n-)?nodes-base\./, '');
let mainInputCount = 1; // Default: most nodes have 1 input
if (shortType === 'merge' || shortType === 'compareDatasets') {
mainInputCount = 2; // Merge nodes have 2 inputs
}
// Trigger nodes have 0 inputs
if (nodeInfo.isTrigger || isTriggerNode(targetNode.type)) {
mainInputCount = 0;
}
if (mainInputCount > 0 && connection.index >= mainInputCount) {
result.errors.push({
type: 'error',
nodeName: targetNode.name,
message: `Input index ${connection.index} on node "${targetNode.name}" exceeds its input count (${mainInputCount}). ` +
`Connection from "${sourceName}" targets input ${connection.index}, but this node has ${mainInputCount} main input(s) (indices 0-${mainInputCount - 1}).`,
code: 'INPUT_INDEX_OUT_OF_BOUNDS'
});
result.statistics.invalidConnections++;
}
}
/**
* Flag nodes that are not referenced in any connection (source or target).
* Used as a lightweight check when BFS reachability is not applicable.
*/
private flagOrphanedNodes(
workflow: WorkflowJson,
result: WorkflowValidationResult
): void {
const connectedNodes = new Set<string>();
for (const [sourceName, outputs] of Object.entries(workflow.connections)) {
connectedNodes.add(sourceName);
for (const outputConns of Object.values(outputs)) {
if (!Array.isArray(outputConns)) continue;
for (const conns of outputConns) {
if (!conns) continue;
for (const conn of conns) {
if (conn) connectedNodes.add(conn.node);
}
}
}
}
for (const node of workflow.nodes) {
if (node.disabled || isNonExecutableNode(node.type)) continue;
if (isTriggerNode(node.type)) continue;
if (!connectedNodes.has(node.name)) {
result.warnings.push({
type: 'warning',
nodeId: node.id,
nodeName: node.name,
message: 'Node is not connected to any other nodes'
});
}
}
}
/**
* BFS from all trigger nodes to detect unreachable nodes.
* Replaces the simple "is node in any connection" check with proper graph traversal.
*/
private validateTriggerReachability(
workflow: WorkflowJson,
result: WorkflowValidationResult
): void {
// Build adjacency list (forward direction)
const adjacency = new Map<string, Set<string>>();
for (const [sourceName, outputs] of Object.entries(workflow.connections)) {
if (!adjacency.has(sourceName)) adjacency.set(sourceName, new Set());
for (const outputConns of Object.values(outputs)) {
if (Array.isArray(outputConns)) {
for (const conns of outputConns) {
if (!conns) continue;
for (const conn of conns) {
if (conn) {
adjacency.get(sourceName)!.add(conn.node);
// Also track that the target exists in the graph
if (!adjacency.has(conn.node)) adjacency.set(conn.node, new Set());
}
}
}
}
}
}
// Identify trigger nodes
const triggerNodes: string[] = [];
for (const node of workflow.nodes) {
if (isTriggerNode(node.type) && !node.disabled) {
triggerNodes.push(node.name);
}
}
// If no trigger nodes, fall back to simple orphaned check
if (triggerNodes.length === 0) {
this.flagOrphanedNodes(workflow, result);
return;
}
// BFS from all trigger nodes
const reachable = new Set<string>();
const queue: string[] = [...triggerNodes];
for (const t of triggerNodes) reachable.add(t);
while (queue.length > 0) {
const current = queue.shift()!;
const neighbors = adjacency.get(current);
if (neighbors) {
for (const neighbor of neighbors) {
if (!reachable.has(neighbor)) {
reachable.add(neighbor);
queue.push(neighbor);
}
}
}
}
// Flag unreachable nodes
for (const node of workflow.nodes) {
if (node.disabled || isNonExecutableNode(node.type)) continue;
if (isTriggerNode(node.type)) continue;
if (!reachable.has(node.name)) {
result.warnings.push({
type: 'warning',
nodeId: node.id,
nodeName: node.name,
message: 'Node is not reachable from any trigger node'
});
}
}
}
/**
* Check if workflow has cycles
* Allow legitimate loops for SplitInBatches and similar loop nodes
@@ -1024,23 +1235,13 @@ export class WorkflowValidator {
const connections = workflow.connections[nodeName];
if (connections) {
const allTargets: string[] = [];
if (connections.main) {
connections.main.flat().forEach(conn => {
if (conn) allTargets.push(conn.node);
});
}
if (connections.error) {
connections.error.flat().forEach(conn => {
if (conn) allTargets.push(conn.node);
});
}
if (connections.ai_tool) {
connections.ai_tool.flat().forEach(conn => {
if (conn) allTargets.push(conn.node);
});
for (const outputConns of Object.values(connections)) {
if (Array.isArray(outputConns)) {
outputConns.flat().forEach(conn => {
if (conn) allTargets.push(conn.node);
});
}
}
const currentNodeType = nodeTypeMap.get(nodeName);