Cleaned up all skills to remove research/telemetry context that was used during design but is not needed at runtime when AI agents use the skills. ## Changes Made ### Pattern 1: Research Framing Removed - "From analysis of X workflows/events" → Removed - "From telemetry analysis:" → Replaced with operational context - "Based on X real workflows" → Simplified to general statements ### Pattern 2: Popularity Metrics Removed - "**Popularity**: Second most common (892 templates)" → Removed entirely - "813 searches", "456 templates", etc. → Removed ### Pattern 3: Frequency Percentages Converted - "**Frequency**: 45% of errors" → "Most common error" - "**Frequency**: 28%" → "Second most common" - "**Frequency**: 12%" → "Common error" - Percentages in tables → Priority levels (Highest/High/Medium/Low) ### Pattern 4: Operational Guidance Kept - ✅ Success rates (91.7%) - helps tool selection - ✅ Average times (18s, 56s) - sets expectations - ✅ Relative priority (most common, typical) - guides decisions - ✅ Iteration counts (2-3 cycles) - manages expectations ## Files Modified (19 files across 4 skills) **Skill #2: MCP Tools Expert (5 files)** - Removed telemetry occurrence counts - Kept success rates and average times **Skill #3: Workflow Patterns (7 files)** - Removed all popularity metrics from pattern files - Removed "From analysis of 31,917 workflows" - Removed template counts **Skill #4: Validation Expert (4 files)** - Converted frequency % to priority levels - Removed "From analysis of 19,113 errors" - Removed telemetry loop counts (kept iteration guidance) **Skill #5: Node Configuration (3 files)** - Removed workflow update counts - Removed essentials call counts - Kept success rates and timing guidance ## Result Skills now provide clean, focused runtime guidance without research justification. Content is more actionable for AI agents using the skills. All technical guidance, examples, patterns, and operational metrics preserved. Only removed: research methodology, data source attribution, and statistical justification for design decisions. 🤖 Conceived by Romuald Członkowski - https://www.aiadvisors.pl/en
16 KiB
Database Operations Pattern
Use Case: Read, write, sync, and manage database data in workflows.
Pattern Structure
Trigger → [Query/Read] → [Transform] → [Write/Update] → [Verify/Log]
Key Characteristic: Data persistence and synchronization
Core Components
1. Trigger
Options:
- Schedule - Periodic sync/maintenance (most common)
- Webhook - Event-driven writes
- Manual - One-time operations
2. Database Read Nodes
Supported databases:
- Postgres
- MySQL
- MongoDB
- Microsoft SQL
- SQLite
- Redis
- And more via community nodes
3. Transform
Purpose: Map between different database schemas or formats
Typical nodes:
- Set - Field mapping
- Code - Complex transformations
- Merge - Combine data from multiple sources
4. Database Write Nodes
Operations:
- INSERT - Create new records
- UPDATE - Modify existing records
- UPSERT - Insert or update
- DELETE - Remove records
5. Verification
Purpose: Confirm operations succeeded
Methods:
- Query to verify records
- Count rows affected
- Log results
Common Use Cases
1. Data Synchronization
Flow: Schedule → Read Source DB → Transform → Write Target DB → Log
Example (Postgres to MySQL sync):
1. Schedule (every 15 minutes)
2. Postgres (SELECT * FROM users WHERE updated_at > {{$json.last_sync}})
3. IF (check if records exist)
4. Set (map Postgres schema to MySQL schema)
5. MySQL (INSERT or UPDATE users)
6. Postgres (UPDATE sync_log SET last_sync = NOW())
7. Slack (notify: "Synced X users")
Incremental sync query:
SELECT *
FROM users
WHERE updated_at > $1
ORDER BY updated_at ASC
LIMIT 1000
Parameters:
{
"parameters": [
"={{$node['Get Last Sync'].json.last_sync}}"
]
}
2. ETL (Extract, Transform, Load)
Flow: Extract from multiple sources → Transform → Load into warehouse
Example (Consolidate data):
1. Schedule (daily at 2 AM)
2. [Parallel branches]
├─ Postgres (SELECT orders)
├─ MySQL (SELECT customers)
└─ MongoDB (SELECT products)
3. Merge (combine all data)
4. Code (transform to warehouse schema)
5. Postgres (warehouse - INSERT into fact_sales)
6. Email (send summary report)
3. Data Validation & Cleanup
Flow: Schedule → Query → Validate → Update/Delete invalid records
Example (Clean orphaned records):
1. Schedule (weekly)
2. Postgres (SELECT users WHERE email IS NULL OR email = '')
3. IF (invalid records exist)
4. Postgres (UPDATE users SET status='inactive' WHERE email IS NULL)
5. Postgres (DELETE FROM users WHERE created_at < NOW() - INTERVAL '1 year' AND status='inactive')
6. Slack (alert: "Cleaned X invalid records")
4. Backup & Archive
Flow: Schedule → Query → Export → Store
Example (Archive old records):
1. Schedule (monthly)
2. Postgres (SELECT * FROM orders WHERE created_at < NOW() - INTERVAL '2 years')
3. Code (convert to JSON)
4. Write File (save to archive.json)
5. Google Drive (upload archive)
6. Postgres (DELETE FROM orders WHERE created_at < NOW() - INTERVAL '2 years')
5. Real-time Data Updates
Flow: Webhook → Parse → Update Database
Example (Update user status):
1. Webhook (receive status update)
2. Postgres (UPDATE users SET status = {{$json.body.status}} WHERE id = {{$json.body.user_id}})
3. IF (rows affected > 0)
4. Redis (SET user:{{$json.body.user_id}}:status {{$json.body.status}})
5. Webhook Response ({"success": true})
Database Node Configuration
Postgres
SELECT Query
{
operation: "executeQuery",
query: "SELECT id, name, email FROM users WHERE created_at > $1 LIMIT $2",
parameters: [
"={{$json.since_date}}",
"100"
]
}
INSERT
{
operation: "insert",
table: "users",
columns: "id, name, email, created_at",
values: [
{
id: "={{$json.id}}",
name: "={{$json.name}}",
email: "={{$json.email}}",
created_at: "={{$now}}"
}
]
}
UPDATE
{
operation: "update",
table: "users",
updateKey: "id",
columns: "name, email, updated_at",
values: {
id: "={{$json.id}}",
name: "={{$json.name}}",
email: "={{$json.email}}",
updated_at: "={{$now}}"
}
}
UPSERT (INSERT ... ON CONFLICT)
{
operation: "executeQuery",
query: `
INSERT INTO users (id, name, email)
VALUES ($1, $2, $3)
ON CONFLICT (id)
DO UPDATE SET name = $2, email = $3, updated_at = NOW()
`,
parameters: [
"={{$json.id}}",
"={{$json.name}}",
"={{$json.email}}"
]
}
MySQL
SELECT with JOIN
{
operation: "executeQuery",
query: `
SELECT u.id, u.name, o.order_id, o.total
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > ?
`,
parameters: [
"={{$json.since_date}}"
]
}
Bulk INSERT
{
operation: "insert",
table: "orders",
columns: "user_id, total, status",
values: $json.orders // Array of objects
}
MongoDB
Find Documents
{
operation: "find",
collection: "users",
query: JSON.stringify({
created_at: { $gt: new Date($json.since_date) },
status: "active"
}),
limit: 100
}
Insert Document
{
operation: "insert",
collection: "users",
document: JSON.stringify({
name: $json.name,
email: $json.email,
created_at: new Date()
})
}
Update Document
{
operation: "update",
collection: "users",
query: JSON.stringify({ _id: $json.user_id }),
update: JSON.stringify({
$set: {
status: $json.status,
updated_at: new Date()
}
})
}
Batch Processing
Pattern 1: Split In Batches
Use when: Processing large datasets to avoid memory issues
Postgres (SELECT 10000 records)
→ Split In Batches (100 items per batch)
→ Transform
→ MySQL (write batch)
→ Loop (until all processed)
Pattern 2: Paginated Queries
Use when: Database has millions of records
Set (initialize: offset=0, limit=1000)
→ Loop Start
→ Postgres (SELECT * FROM large_table LIMIT {{$json.limit}} OFFSET {{$json.offset}})
→ IF (records returned)
├─ Process records
├─ Set (increment offset by 1000)
└─ Loop back
└─ [No records] → End
Query:
SELECT * FROM large_table
ORDER BY id
LIMIT $1 OFFSET $2
Pattern 3: Cursor-Based Pagination
Better performance for large datasets:
Set (initialize: last_id=0)
→ Loop Start
→ Postgres (SELECT * FROM table WHERE id > {{$json.last_id}} ORDER BY id LIMIT 1000)
→ IF (records returned)
├─ Process records
├─ Code (get max id from batch)
└─ Loop back
└─ [No records] → End
Query:
SELECT * FROM table
WHERE id > $1
ORDER BY id ASC
LIMIT 1000
Transaction Handling
Pattern 1: BEGIN/COMMIT/ROLLBACK
For databases that support transactions:
// Node 1: Begin Transaction
{
operation: "executeQuery",
query: "BEGIN"
}
// Node 2-N: Your operations
{
operation: "executeQuery",
query: "INSERT INTO ...",
continueOnFail: true
}
// Node N+1: Commit or Rollback
{
operation: "executeQuery",
query: "={{$node['Operation'].json.error ? 'ROLLBACK' : 'COMMIT'}}"
}
Pattern 2: Atomic Operations
Use database features for atomicity:
-- Upsert example (atomic)
INSERT INTO inventory (product_id, quantity)
VALUES ($1, $2)
ON CONFLICT (product_id)
DO UPDATE SET quantity = inventory.quantity + $2
Pattern 3: Error Rollback
Manual rollback on error:
Try Operations:
Postgres (INSERT orders)
MySQL (INSERT order_items)
Error Trigger:
Postgres (DELETE FROM orders WHERE id = {{$json.order_id}})
MySQL (DELETE FROM order_items WHERE order_id = {{$json.order_id}})
Data Transformation
Schema Mapping
// Code node - map schemas
const sourceData = $input.all();
return sourceData.map(item => ({
json: {
// Source → Target mapping
user_id: item.json.id,
full_name: `${item.json.first_name} ${item.json.last_name}`,
email_address: item.json.email,
registration_date: new Date(item.json.created_at).toISOString(),
// Computed fields
is_premium: item.json.plan_type === 'pro',
// Default values
status: item.json.status || 'active'
}
}));
Data Type Conversions
// Code node - convert data types
return $input.all().map(item => ({
json: {
// String to number
user_id: parseInt(item.json.user_id),
// String to date
created_at: new Date(item.json.created_at),
// Number to boolean
is_active: item.json.active === 1,
// JSON string to object
metadata: JSON.parse(item.json.metadata || '{}'),
// Null handling
email: item.json.email || null
}
}));
Aggregation
// Code node - aggregate data
const items = $input.all();
const summary = items.reduce((acc, item) => {
const date = item.json.created_at.split('T')[0];
if (!acc[date]) {
acc[date] = { count: 0, total: 0 };
}
acc[date].count++;
acc[date].total += item.json.amount;
return acc;
}, {});
return Object.entries(summary).map(([date, data]) => ({
json: {
date,
count: data.count,
total: data.total,
average: data.total / data.count
}
}));
Performance Optimization
1. Use Indexes
Ensure database has proper indexes:
-- Add index for sync queries
CREATE INDEX idx_users_updated_at ON users(updated_at);
-- Add index for lookups
CREATE INDEX idx_orders_user_id ON orders(user_id);
2. Limit Result Sets
Always use LIMIT:
-- ✅ Good
SELECT * FROM large_table
WHERE created_at > $1
LIMIT 1000
-- ❌ Bad (unbounded)
SELECT * FROM large_table
WHERE created_at > $1
3. Use Prepared Statements
Parameterized queries are faster:
// ✅ Good - prepared statement
{
query: "SELECT * FROM users WHERE id = $1",
parameters: ["={{$json.id}}"]
}
// ❌ Bad - string concatenation
{
query: "SELECT * FROM users WHERE id = '={{$json.id}}'"
}
4. Batch Writes
Write multiple records at once:
// ✅ Good - batch insert
{
operation: "insert",
table: "orders",
values: $json.items // Array of 100 items
}
// ❌ Bad - individual inserts in loop
// 100 separate INSERT statements
5. Connection Pooling
Configure in credentials:
{
host: "db.example.com",
database: "mydb",
user: "user",
password: "pass",
// Connection pool settings
min: 2,
max: 10,
idleTimeoutMillis: 30000
}
Error Handling
Pattern 1: Check Rows Affected
Database Operation (UPDATE users...)
→ IF ({{$json.rowsAffected === 0}})
└─ Alert: "No rows updated - record not found"
Pattern 2: Constraint Violations
// Database operation with continueOnFail: true
{
operation: "insert",
continueOnFail: true
}
// Next node: Check for errors
IF ({{$json.error !== undefined}})
→ IF ({{$json.error.includes('duplicate key')}})
└─ Log: "Record already exists - skipping"
→ ELSE
└─ Alert: "Database error: {{$json.error}}"
Pattern 3: Rollback on Error
Try Operations:
→ Database Write 1
→ Database Write 2
→ Database Write 3
Error Trigger:
→ Rollback Operations
→ Alert Admin
Security Best Practices
1. Use Parameterized Queries (Prevent SQL Injection)
// ✅ SAFE - parameterized
{
query: "SELECT * FROM users WHERE email = $1",
parameters: ["={{$json.email}}"]
}
// ❌ DANGEROUS - SQL injection risk
{
query: "SELECT * FROM users WHERE email = '={{$json.email}}'"
}
2. Least Privilege Access
Create dedicated workflow user:
-- ✅ Good - limited permissions
CREATE USER n8n_workflow WITH PASSWORD 'secure_password';
GRANT SELECT, INSERT, UPDATE ON orders TO n8n_workflow;
GRANT SELECT ON users TO n8n_workflow;
-- ❌ Bad - too much access
GRANT ALL PRIVILEGES TO n8n_workflow;
3. Validate Input Data
// Code node - validate before write
const email = $json.email;
const name = $json.name;
// Validation
if (!email || !email.includes('@')) {
throw new Error('Invalid email address');
}
if (!name || name.length < 2) {
throw new Error('Invalid name');
}
// Sanitization
return [{
json: {
email: email.toLowerCase().trim(),
name: name.trim()
}
}];
4. Encrypt Sensitive Data
// Code node - encrypt before storage
const crypto = require('crypto');
const algorithm = 'aes-256-cbc';
const key = Buffer.from($credentials.encryptionKey, 'hex');
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(algorithm, key, iv);
let encrypted = cipher.update($json.sensitive_data, 'utf8', 'hex');
encrypted += cipher.final('hex');
return [{
json: {
encrypted_data: encrypted,
iv: iv.toString('hex')
}
}];
Common Gotchas
1. ❌ Wrong: Unbounded queries
SELECT * FROM large_table -- Could return millions
✅ Correct: Use LIMIT
SELECT * FROM large_table
ORDER BY created_at DESC
LIMIT 1000
2. ❌ Wrong: String concatenation in queries
query: "SELECT * FROM users WHERE id = '{{$json.id}}'"
✅ Correct: Parameterized queries
query: "SELECT * FROM users WHERE id = $1",
parameters: ["={{$json.id}}"]
3. ❌ Wrong: No transaction for multi-step operations
INSERT into orders
INSERT into order_items // Fails → orphaned order record
✅ Correct: Use transaction
BEGIN
INSERT into orders
INSERT into order_items
COMMIT (or ROLLBACK on error)
4. ❌ Wrong: Processing all items at once
SELECT 1000000 records → Process all → OOM error
✅ Correct: Batch processing
SELECT records → Split In Batches (1000) → Process → Loop
Real Template Examples
From n8n template library (456 database templates):
Data Sync:
Schedule → Postgres (SELECT new records) → Transform → MySQL (INSERT)
ETL Pipeline:
Schedule → [Multiple DB reads] → Merge → Transform → Warehouse (INSERT)
Backup:
Schedule → Postgres (SELECT all) → JSON → Google Drive (upload)
Use search_templates({query: "database"}) to find more!
Checklist for Database Workflows
Planning
- Identify source and target databases
- Understand schema differences
- Plan transformation logic
- Consider batch size for large datasets
- Design error handling strategy
Implementation
- Use parameterized queries (never concatenate)
- Add LIMIT to all SELECT queries
- Use appropriate operation (INSERT/UPDATE/UPSERT)
- Configure credentials properly
- Test with small dataset first
Performance
- Add database indexes for queries
- Use batch operations
- Implement pagination for large datasets
- Configure connection pooling
- Monitor query execution times
Security
- Use parameterized queries (SQL injection prevention)
- Least privilege database user
- Validate and sanitize input
- Encrypt sensitive data
- Never log sensitive data
Reliability
- Add transaction handling if needed
- Check rows affected
- Handle constraint violations
- Implement retry logic
- Add Error Trigger workflow
Summary
Key Points:
- Always use parameterized queries (prevent SQL injection)
- Batch processing for large datasets
- Transaction handling for multi-step operations
- Limit result sets to avoid memory issues
- Validate input data before writes
Pattern: Trigger → Query → Transform → Write → Verify
Related:
- http_api_integration.md - Fetching data to store in DB
- scheduled_tasks.md - Periodic database maintenance