Files
n8n-skills/skills/n8n-workflow-patterns/database_operations.md
czlonkowski 33e83c29dc refactor: Remove research context from skill content
Cleaned up all skills to remove research/telemetry context that was used
during design but is not needed at runtime when AI agents use the skills.

## Changes Made

### Pattern 1: Research Framing Removed
- "From analysis of X workflows/events" → Removed
- "From telemetry analysis:" → Replaced with operational context
- "Based on X real workflows" → Simplified to general statements

### Pattern 2: Popularity Metrics Removed
- "**Popularity**: Second most common (892 templates)" → Removed entirely
- "813 searches", "456 templates", etc. → Removed

### Pattern 3: Frequency Percentages Converted
- "**Frequency**: 45% of errors" → "Most common error"
- "**Frequency**: 28%" → "Second most common"
- "**Frequency**: 12%" → "Common error"
- Percentages in tables → Priority levels (Highest/High/Medium/Low)

### Pattern 4: Operational Guidance Kept
-  Success rates (91.7%) - helps tool selection
-  Average times (18s, 56s) - sets expectations
-  Relative priority (most common, typical) - guides decisions
-  Iteration counts (2-3 cycles) - manages expectations

## Files Modified (19 files across 4 skills)

**Skill #2: MCP Tools Expert (5 files)**
- Removed telemetry occurrence counts
- Kept success rates and average times

**Skill #3: Workflow Patterns (7 files)**
- Removed all popularity metrics from pattern files
- Removed "From analysis of 31,917 workflows"
- Removed template counts

**Skill #4: Validation Expert (4 files)**
- Converted frequency % to priority levels
- Removed "From analysis of 19,113 errors"
- Removed telemetry loop counts (kept iteration guidance)

**Skill #5: Node Configuration (3 files)**
- Removed workflow update counts
- Removed essentials call counts
- Kept success rates and timing guidance

## Result

Skills now provide clean, focused runtime guidance without research
justification. Content is more actionable for AI agents using the skills.

All technical guidance, examples, patterns, and operational metrics preserved.
Only removed: research methodology, data source attribution, and statistical
justification for design decisions.

🤖 Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en
2025-10-20 11:45:04 +02:00

16 KiB

Database Operations Pattern

Use Case: Read, write, sync, and manage database data in workflows.


Pattern Structure

Trigger → [Query/Read] → [Transform] → [Write/Update] → [Verify/Log]

Key Characteristic: Data persistence and synchronization


Core Components

1. Trigger

Options:

  • Schedule - Periodic sync/maintenance (most common)
  • Webhook - Event-driven writes
  • Manual - One-time operations

2. Database Read Nodes

Supported databases:

  • Postgres
  • MySQL
  • MongoDB
  • Microsoft SQL
  • SQLite
  • Redis
  • And more via community nodes

3. Transform

Purpose: Map between different database schemas or formats

Typical nodes:

  • Set - Field mapping
  • Code - Complex transformations
  • Merge - Combine data from multiple sources

4. Database Write Nodes

Operations:

  • INSERT - Create new records
  • UPDATE - Modify existing records
  • UPSERT - Insert or update
  • DELETE - Remove records

5. Verification

Purpose: Confirm operations succeeded

Methods:

  • Query to verify records
  • Count rows affected
  • Log results

Common Use Cases

1. Data Synchronization

Flow: Schedule → Read Source DB → Transform → Write Target DB → Log

Example (Postgres to MySQL sync):

1. Schedule (every 15 minutes)
2. Postgres (SELECT * FROM users WHERE updated_at > {{$json.last_sync}})
3. IF (check if records exist)
4. Set (map Postgres schema to MySQL schema)
5. MySQL (INSERT or UPDATE users)
6. Postgres (UPDATE sync_log SET last_sync = NOW())
7. Slack (notify: "Synced X users")

Incremental sync query:

SELECT *
FROM users
WHERE updated_at > $1
ORDER BY updated_at ASC
LIMIT 1000

Parameters:

{
  "parameters": [
    "={{$node['Get Last Sync'].json.last_sync}}"
  ]
}

2. ETL (Extract, Transform, Load)

Flow: Extract from multiple sources → Transform → Load into warehouse

Example (Consolidate data):

1. Schedule (daily at 2 AM)
2. [Parallel branches]
   ├─ Postgres (SELECT orders)
   ├─ MySQL (SELECT customers)
   └─ MongoDB (SELECT products)
3. Merge (combine all data)
4. Code (transform to warehouse schema)
5. Postgres (warehouse - INSERT into fact_sales)
6. Email (send summary report)

3. Data Validation & Cleanup

Flow: Schedule → Query → Validate → Update/Delete invalid records

Example (Clean orphaned records):

1. Schedule (weekly)
2. Postgres (SELECT users WHERE email IS NULL OR email = '')
3. IF (invalid records exist)
4. Postgres (UPDATE users SET status='inactive' WHERE email IS NULL)
5. Postgres (DELETE FROM users WHERE created_at < NOW() - INTERVAL '1 year' AND status='inactive')
6. Slack (alert: "Cleaned X invalid records")

4. Backup & Archive

Flow: Schedule → Query → Export → Store

Example (Archive old records):

1. Schedule (monthly)
2. Postgres (SELECT * FROM orders WHERE created_at < NOW() - INTERVAL '2 years')
3. Code (convert to JSON)
4. Write File (save to archive.json)
5. Google Drive (upload archive)
6. Postgres (DELETE FROM orders WHERE created_at < NOW() - INTERVAL '2 years')

5. Real-time Data Updates

Flow: Webhook → Parse → Update Database

Example (Update user status):

1. Webhook (receive status update)
2. Postgres (UPDATE users SET status = {{$json.body.status}} WHERE id = {{$json.body.user_id}})
3. IF (rows affected > 0)
4. Redis (SET user:{{$json.body.user_id}}:status {{$json.body.status}})
5. Webhook Response ({"success": true})

Database Node Configuration

Postgres

SELECT Query

{
  operation: "executeQuery",
  query: "SELECT id, name, email FROM users WHERE created_at > $1 LIMIT $2",
  parameters: [
    "={{$json.since_date}}",
    "100"
  ]
}

INSERT

{
  operation: "insert",
  table: "users",
  columns: "id, name, email, created_at",
  values: [
    {
      id: "={{$json.id}}",
      name: "={{$json.name}}",
      email: "={{$json.email}}",
      created_at: "={{$now}}"
    }
  ]
}

UPDATE

{
  operation: "update",
  table: "users",
  updateKey: "id",
  columns: "name, email, updated_at",
  values: {
    id: "={{$json.id}}",
    name: "={{$json.name}}",
    email: "={{$json.email}}",
    updated_at: "={{$now}}"
  }
}

UPSERT (INSERT ... ON CONFLICT)

{
  operation: "executeQuery",
  query: `
    INSERT INTO users (id, name, email)
    VALUES ($1, $2, $3)
    ON CONFLICT (id)
    DO UPDATE SET name = $2, email = $3, updated_at = NOW()
  `,
  parameters: [
    "={{$json.id}}",
    "={{$json.name}}",
    "={{$json.email}}"
  ]
}

MySQL

SELECT with JOIN

{
  operation: "executeQuery",
  query: `
    SELECT u.id, u.name, o.order_id, o.total
    FROM users u
    LEFT JOIN orders o ON u.id = o.user_id
    WHERE u.created_at > ?
  `,
  parameters: [
    "={{$json.since_date}}"
  ]
}

Bulk INSERT

{
  operation: "insert",
  table: "orders",
  columns: "user_id, total, status",
  values: $json.orders  // Array of objects
}

MongoDB

Find Documents

{
  operation: "find",
  collection: "users",
  query: JSON.stringify({
    created_at: { $gt: new Date($json.since_date) },
    status: "active"
  }),
  limit: 100
}

Insert Document

{
  operation: "insert",
  collection: "users",
  document: JSON.stringify({
    name: $json.name,
    email: $json.email,
    created_at: new Date()
  })
}

Update Document

{
  operation: "update",
  collection: "users",
  query: JSON.stringify({ _id: $json.user_id }),
  update: JSON.stringify({
    $set: {
      status: $json.status,
      updated_at: new Date()
    }
  })
}

Batch Processing

Pattern 1: Split In Batches

Use when: Processing large datasets to avoid memory issues

Postgres (SELECT 10000 records)
  → Split In Batches (100 items per batch)
  → Transform
  → MySQL (write batch)
  → Loop (until all processed)

Pattern 2: Paginated Queries

Use when: Database has millions of records

Set (initialize: offset=0, limit=1000)
  → Loop Start
  → Postgres (SELECT * FROM large_table LIMIT {{$json.limit}} OFFSET {{$json.offset}})
  → IF (records returned)
    ├─ Process records
    ├─ Set (increment offset by 1000)
    └─ Loop back
  └─ [No records] → End

Query:

SELECT * FROM large_table
ORDER BY id
LIMIT $1 OFFSET $2

Pattern 3: Cursor-Based Pagination

Better performance for large datasets:

Set (initialize: last_id=0)
  → Loop Start
  → Postgres (SELECT * FROM table WHERE id > {{$json.last_id}} ORDER BY id LIMIT 1000)
  → IF (records returned)
    ├─ Process records
    ├─ Code (get max id from batch)
    └─ Loop back
  └─ [No records] → End

Query:

SELECT * FROM table
WHERE id > $1
ORDER BY id ASC
LIMIT 1000

Transaction Handling

Pattern 1: BEGIN/COMMIT/ROLLBACK

For databases that support transactions:

// Node 1: Begin Transaction
{
  operation: "executeQuery",
  query: "BEGIN"
}

// Node 2-N: Your operations
{
  operation: "executeQuery",
  query: "INSERT INTO ...",
  continueOnFail: true
}

// Node N+1: Commit or Rollback
{
  operation: "executeQuery",
  query: "={{$node['Operation'].json.error ? 'ROLLBACK' : 'COMMIT'}}"
}

Pattern 2: Atomic Operations

Use database features for atomicity:

-- Upsert example (atomic)
INSERT INTO inventory (product_id, quantity)
VALUES ($1, $2)
ON CONFLICT (product_id)
DO UPDATE SET quantity = inventory.quantity + $2

Pattern 3: Error Rollback

Manual rollback on error:

Try Operations:
  Postgres (INSERT orders)
  MySQL (INSERT order_items)

Error Trigger:
  Postgres (DELETE FROM orders WHERE id = {{$json.order_id}})
  MySQL (DELETE FROM order_items WHERE order_id = {{$json.order_id}})

Data Transformation

Schema Mapping

// Code node - map schemas
const sourceData = $input.all();

return sourceData.map(item => ({
  json: {
    // Source → Target mapping
    user_id: item.json.id,
    full_name: `${item.json.first_name} ${item.json.last_name}`,
    email_address: item.json.email,
    registration_date: new Date(item.json.created_at).toISOString(),
    // Computed fields
    is_premium: item.json.plan_type === 'pro',
    // Default values
    status: item.json.status || 'active'
  }
}));

Data Type Conversions

// Code node - convert data types
return $input.all().map(item => ({
  json: {
    // String to number
    user_id: parseInt(item.json.user_id),
    // String to date
    created_at: new Date(item.json.created_at),
    // Number to boolean
    is_active: item.json.active === 1,
    // JSON string to object
    metadata: JSON.parse(item.json.metadata || '{}'),
    // Null handling
    email: item.json.email || null
  }
}));

Aggregation

// Code node - aggregate data
const items = $input.all();

const summary = items.reduce((acc, item) => {
  const date = item.json.created_at.split('T')[0];
  if (!acc[date]) {
    acc[date] = { count: 0, total: 0 };
  }
  acc[date].count++;
  acc[date].total += item.json.amount;
  return acc;
}, {});

return Object.entries(summary).map(([date, data]) => ({
  json: {
    date,
    count: data.count,
    total: data.total,
    average: data.total / data.count
  }
}));

Performance Optimization

1. Use Indexes

Ensure database has proper indexes:

-- Add index for sync queries
CREATE INDEX idx_users_updated_at ON users(updated_at);

-- Add index for lookups
CREATE INDEX idx_orders_user_id ON orders(user_id);

2. Limit Result Sets

Always use LIMIT:

-- ✅ Good
SELECT * FROM large_table
WHERE created_at > $1
LIMIT 1000

-- ❌ Bad (unbounded)
SELECT * FROM large_table
WHERE created_at > $1

3. Use Prepared Statements

Parameterized queries are faster:

// ✅ Good - prepared statement
{
  query: "SELECT * FROM users WHERE id = $1",
  parameters: ["={{$json.id}}"]
}

// ❌ Bad - string concatenation
{
  query: "SELECT * FROM users WHERE id = '={{$json.id}}'"
}

4. Batch Writes

Write multiple records at once:

// ✅ Good - batch insert
{
  operation: "insert",
  table: "orders",
  values: $json.items  // Array of 100 items
}

// ❌ Bad - individual inserts in loop
// 100 separate INSERT statements

5. Connection Pooling

Configure in credentials:

{
  host: "db.example.com",
  database: "mydb",
  user: "user",
  password: "pass",
  // Connection pool settings
  min: 2,
  max: 10,
  idleTimeoutMillis: 30000
}

Error Handling

Pattern 1: Check Rows Affected

Database Operation (UPDATE users...)
  → IF ({{$json.rowsAffected === 0}})
    └─ Alert: "No rows updated - record not found"

Pattern 2: Constraint Violations

// Database operation with continueOnFail: true
{
  operation: "insert",
  continueOnFail: true
}

// Next node: Check for errors
IF ({{$json.error !== undefined}})
   IF ({{$json.error.includes('duplicate key')}})
    └─ Log: "Record already exists - skipping"
   ELSE
    └─ Alert: "Database error: {{$json.error}}"

Pattern 3: Rollback on Error

Try Operations:
  → Database Write 1
  → Database Write 2
  → Database Write 3

Error Trigger:
  → Rollback Operations
  → Alert Admin

Security Best Practices

1. Use Parameterized Queries (Prevent SQL Injection)

// ✅ SAFE - parameterized
{
  query: "SELECT * FROM users WHERE email = $1",
  parameters: ["={{$json.email}}"]
}

// ❌ DANGEROUS - SQL injection risk
{
  query: "SELECT * FROM users WHERE email = '={{$json.email}}'"
}

2. Least Privilege Access

Create dedicated workflow user:

-- ✅ Good - limited permissions
CREATE USER n8n_workflow WITH PASSWORD 'secure_password';
GRANT SELECT, INSERT, UPDATE ON orders TO n8n_workflow;
GRANT SELECT ON users TO n8n_workflow;

-- ❌ Bad - too much access
GRANT ALL PRIVILEGES TO n8n_workflow;

3. Validate Input Data

// Code node - validate before write
const email = $json.email;
const name = $json.name;

// Validation
if (!email || !email.includes('@')) {
  throw new Error('Invalid email address');
}

if (!name || name.length < 2) {
  throw new Error('Invalid name');
}

// Sanitization
return [{
  json: {
    email: email.toLowerCase().trim(),
    name: name.trim()
  }
}];

4. Encrypt Sensitive Data

// Code node - encrypt before storage
const crypto = require('crypto');

const algorithm = 'aes-256-cbc';
const key = Buffer.from($credentials.encryptionKey, 'hex');
const iv = crypto.randomBytes(16);

const cipher = crypto.createCipheriv(algorithm, key, iv);
let encrypted = cipher.update($json.sensitive_data, 'utf8', 'hex');
encrypted += cipher.final('hex');

return [{
  json: {
    encrypted_data: encrypted,
    iv: iv.toString('hex')
  }
}];

Common Gotchas

1. Wrong: Unbounded queries

SELECT * FROM large_table  -- Could return millions

Correct: Use LIMIT

SELECT * FROM large_table
ORDER BY created_at DESC
LIMIT 1000

2. Wrong: String concatenation in queries

query: "SELECT * FROM users WHERE id = '{{$json.id}}'"

Correct: Parameterized queries

query: "SELECT * FROM users WHERE id = $1",
parameters: ["={{$json.id}}"]

3. Wrong: No transaction for multi-step operations

INSERT into orders
INSERT into order_items  // Fails → orphaned order record

Correct: Use transaction

BEGIN
INSERT into orders
INSERT into order_items
COMMIT (or ROLLBACK on error)

4. Wrong: Processing all items at once

SELECT 1000000 records → Process all → OOM error

Correct: Batch processing

SELECT records → Split In Batches (1000) → Process → Loop

Real Template Examples

From n8n template library (456 database templates):

Data Sync:

Schedule → Postgres (SELECT new records) → Transform → MySQL (INSERT)

ETL Pipeline:

Schedule → [Multiple DB reads] → Merge → Transform → Warehouse (INSERT)

Backup:

Schedule → Postgres (SELECT all) → JSON → Google Drive (upload)

Use search_templates({query: "database"}) to find more!


Checklist for Database Workflows

Planning

  • Identify source and target databases
  • Understand schema differences
  • Plan transformation logic
  • Consider batch size for large datasets
  • Design error handling strategy

Implementation

  • Use parameterized queries (never concatenate)
  • Add LIMIT to all SELECT queries
  • Use appropriate operation (INSERT/UPDATE/UPSERT)
  • Configure credentials properly
  • Test with small dataset first

Performance

  • Add database indexes for queries
  • Use batch operations
  • Implement pagination for large datasets
  • Configure connection pooling
  • Monitor query execution times

Security

  • Use parameterized queries (SQL injection prevention)
  • Least privilege database user
  • Validate and sanitize input
  • Encrypt sensitive data
  • Never log sensitive data

Reliability

  • Add transaction handling if needed
  • Check rows affected
  • Handle constraint violations
  • Implement retry logic
  • Add Error Trigger workflow

Summary

Key Points:

  1. Always use parameterized queries (prevent SQL injection)
  2. Batch processing for large datasets
  3. Transaction handling for multi-step operations
  4. Limit result sets to avoid memory issues
  5. Validate input data before writes

Pattern: Trigger → Query → Transform → Write → Verify

Related: