dbt Change Data Capture (CDC): Complete Implementation Guide
This comprehensive guide covers implementing Change Data Capture (CDC) patterns in dbt for tracking and processing data changes over time. Learn to build robust, efficient CDC solutions for real-time data synchronization and historical tracking.
1. Introduction to CDC in dbt
Change Data Capture (CDC) is a pattern used to identify and track changes in data over time. In dbt, CDC enables efficient data synchronization, historical tracking, and real-time analytics by processing only changed records rather than entire datasets.
Key Benefits:
- Efficiency: Process only changed data, reducing compute costs
- Real-time Analytics: Enable near-real-time data processing
- Data Lineage: Maintain complete audit trails
- Conflict Resolution: Handle concurrent updates gracefully
2. CDC vs dbt Materialization Strategies
It's crucial to understand the distinction between Change Data Capture (CDC) as a data integration pattern and dbt's incremental materialization strategies like "merge". While they work together, they serve different purposes:
2.1 Change Data Capture (CDC) - Data Integration Pattern
CDC is a data integration pattern focused on identifying, capturing, and processing data changes from source systems:
- Purpose: Capture and identify what data has changed in source systems
- Scope: Handles change detection, ordering, and metadata enrichment
- Methods: Timestamp-based, log-based, trigger-based, snapshot comparison
- Output: Enriched change records with operation types (INSERT, UPDATE, DELETE)
- Business Value: Enables real-time data synchronization and audit trails
2.2 dbt Materialization Strategies - Data Processing Method
dbt materialization strategies determine how dbt physically applies changes to target tables:
- Purpose: Define how dbt writes data to the data warehouse
- Scope: Controls SQL operations for building/updating tables
- Strategies: append, merge, delete+insert, insert_overwrite
- Output: Updated target tables in the data warehouse
- Technical Focus: Optimizes warehouse operations and performance
2.3 How They Work Together
Aspect |
CDC Pattern |
dbt Materialization |
Combined Example |
Data Source |
Captures changes from OLTP systems |
Processes data from staging tables |
CDC captures database logs → dbt processes with merge strategy |
Change Detection |
Identifies what changed and when |
Uses CDC output to determine actions |
CDC identifies updated orders → merge strategy updates dimension table |
Metadata Handling |
Adds operation types, timestamps, sequences |
Uses metadata for conditional logic |
CDC adds _operation_type → merge uses it for INSERT/UPDATE/DELETE |
Performance |
Reduces data volume by filtering changes |
Optimizes warehouse operations |
CDC processes 1000 changed records → merge updates only those rows |
2.4 Practical Example: CDC + Merge Strategy
-- Step 1: CDC Pattern captures changes from source
-- models/staging/stg_customers_cdc.sql
WITH cdc_changes AS (
SELECT
customer_id,
customer_name,
email,
updated_at,
_operation_type, -- CDC metadata: INSERT, UPDATE, DELETE
_sequence_number -- CDC ordering
FROM {{ source('cdc_logs', 'customer_changes') }}
WHERE updated_at > '{{ var("last_processed_timestamp") }}'
),
-- Step 2: dbt Merge Strategy applies changes to target
-- models/marts/dim_customers.sql
{{
config(
materialized='incremental',
incremental_strategy='merge', -- dbt materialization strategy
unique_key='customer_id'
)
}}
SELECT
customer_id,
customer_name,
email,
updated_at,
CASE
WHEN _operation_type = 'DELETE' THEN TRUE
ELSE FALSE
END as is_deleted
FROM {{ ref('stg_customers_cdc') }}
-- The merge strategy will:
-- - INSERT new customer_ids
-- - UPDATE existing customer_ids
-- - Handle DELETEs based on CDC metadata
2.5 Key Distinctions Summary
Think of it this way:
- CDC answers: "What changed in my source systems and when?"
- Materialization strategies answer: "How should dbt apply these changes to my warehouse?"
- CDC is the input pattern that identifies and enriches change data
- Materialization is the output method that applies changes efficiently
- You use both together: CDC to capture changes + appropriate materialization strategy to apply them
Common Misconception:
The "merge" incremental strategy is not CDC itself. It's a SQL operation (MERGE statement) that dbt uses to apply changes. CDC is the broader pattern of identifying and processing those changes in the first place.
3. CDC Fundamentals
3.1 CDC Types
CDC Type |
Description |
Use Case |
Complexity |
Timestamp-Based |
Uses modification timestamps |
Simple incremental loads |
Low |
Log-Based |
Reads database transaction logs |
Real-time replication |
High |
Trigger-Based |
Database triggers capture changes |
Detailed audit trails |
Medium |
Snapshot Comparison |
Compares full snapshots |
Batch processing |
Medium |
3.2 CDC Architecture in dbt
-- CDC Architecture Components
Source System → CDC Capture → Staging Area → dbt Processing → Target Tables
↓ ↓ ↓ ↓ ↓
Operational Change Log Raw Changes Transformed Final Data
Database Extraction Staging Changes Warehouse
4. CDC Implementation Patterns
4.1 Incremental Model Pattern
-- models/staging/stg_orders_cdc.sql
{{
config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
)
}}
WITH source_data AS (
SELECT
order_id,
customer_id,
order_status,
order_date,
updated_at,
_operation_type,
_sequence_number
FROM {{ ref('raw_orders_cdc') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
-- Deduplicate based on sequence number (latest change wins)
deduplicated AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY _sequence_number DESC
) as rn
FROM source_data
)
SELECT *
FROM deduplicated
WHERE rn = 1
4.2 SCD Type 2 with CDC
-- models/marts/dim_customer_scd2.sql
{{
config(
materialized='incremental',
unique_key='surrogate_key'
)
}}
WITH source_changes AS (
SELECT
customer_id,
customer_name,
email,
address,
phone,
updated_at,
_operation_type,
_sequence_number
FROM {{ ref('stg_customers_cdc') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(effective_end_date) FROM {{ this }} WHERE effective_end_date IS NOT NULL)
OR updated_at > (SELECT MAX(effective_start_date) FROM {{ this }} WHERE effective_end_date IS NULL)
{% endif %}
),
-- Generate surrogate keys and validity periods
scd_logic AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['customer_id', 'updated_at']) }} as surrogate_key,
customer_id,
customer_name,
email,
address,
phone,
updated_at as effective_start_date,
CASE
WHEN _operation_type = 'DELETE' THEN updated_at
ELSE NULL
END as effective_end_date,
_operation_type,
CASE WHEN _operation_type = 'DELETE' THEN FALSE ELSE TRUE END as is_current
FROM source_changes
)
SELECT * FROM scd_logic
5. Timestamp-Based CDC
5.1 Basic Timestamp CDC
-- models/staging/stg_products_timestamp_cdc.sql
{{
config(
materialized='incremental',
unique_key='product_id'
)
}}
SELECT
product_id,
product_name,
category_id,
price,
created_at,
updated_at,
-- Add CDC metadata
CURRENT_TIMESTAMP as processed_at,
'UPSERT' as cdc_operation
FROM {{ source('ecommerce', 'products') }}
{% if is_incremental() %}
-- Only process records updated since last run
WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp)
FROM {{ this }}
)
{% endif %}
5.2 Advanced Timestamp CDC with Deletes
-- models/staging/stg_customers_advanced_cdc.sql
{{
config(
materialized='incremental',
unique_key='customer_id'
)
}}
WITH current_data AS (
SELECT
customer_id,
customer_name,
email,
status,
updated_at,
FALSE as is_deleted
FROM {{ source('crm', 'customers') }}
WHERE status != 'DELETED'
{% if is_incremental() %}
AND updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp)
FROM {{ this }}
WHERE NOT is_deleted
)
{% endif %}
),
deleted_data AS (
SELECT
customer_id,
customer_name,
email,
status,
updated_at,
TRUE as is_deleted
FROM {{ source('crm', 'customers') }}
WHERE status = 'DELETED'
{% if is_incremental() %}
AND updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp)
FROM {{ this }}
WHERE is_deleted
)
{% endif %}
)
SELECT * FROM current_data
UNION ALL
SELECT * FROM deleted_data
6. Log-Based CDC
6.1 Processing WAL/Binlog Data
-- models/staging/stg_orders_log_based_cdc.sql
{{
config(
materialized='incremental',
unique_key=['order_id', 'log_sequence_number']
)
}}
WITH log_entries AS (
SELECT
-- Parse log entry data
JSON_EXTRACT_SCALAR(log_data, '$.order_id') as order_id,
JSON_EXTRACT_SCALAR(log_data, '$.customer_id') as customer_id,
JSON_EXTRACT_SCALAR(log_data, '$.order_status') as order_status,
JSON_EXTRACT_SCALAR(log_data, '$.total_amount') as total_amount,
-- CDC metadata from log
operation_type, -- INSERT, UPDATE, DELETE
log_sequence_number,
transaction_id,
commit_timestamp,
-- Row change metadata
CASE operation_type
WHEN 'UPDATE' THEN JSON_EXTRACT(log_data, '$.before')
ELSE NULL
END as before_values,
CASE operation_type
WHEN 'DELETE' THEN JSON_EXTRACT(log_data, '$.before')
ELSE JSON_EXTRACT(log_data, '$.after')
END as after_values
FROM {{ source('cdc_logs', 'order_changes') }}
{% if is_incremental() %}
WHERE log_sequence_number > (
SELECT COALESCE(MAX(log_sequence_number), 0)
FROM {{ this }}
)
{% endif %}
),
-- Process changes with conflict resolution
processed_changes AS (
SELECT
*,
-- Determine final operation after deduplication
LAST_VALUE(operation_type) OVER (
PARTITION BY order_id
ORDER BY log_sequence_number
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as final_operation,
-- Mark as latest change for this key
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY log_sequence_number DESC
) = 1 as is_latest_change
FROM log_entries
)
SELECT * FROM processed_changes
6.2 Log-Based CDC Deduplication
-- models/marts/fct_orders_current_state.sql
-- Reconstruct current state from log entries
WITH latest_operations AS (
SELECT
order_id,
customer_id,
order_status,
total_amount,
final_operation,
commit_timestamp,
-- Only keep the final state per order
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY log_sequence_number DESC
) = 1 as is_current_state
FROM {{ ref('stg_orders_log_based_cdc') }}
WHERE is_latest_change
),
current_orders AS (
SELECT
order_id,
customer_id,
order_status,
total_amount,
commit_timestamp as last_updated_at
FROM latest_operations
WHERE is_current_state
AND final_operation != 'DELETE' -- Exclude deleted records
)
SELECT * FROM current_orders
7. Trigger-Based CDC
7.1 Processing Audit Table Data
-- models/staging/stg_inventory_audit_cdc.sql
{{
config(
materialized='incremental',
unique_key='audit_id'
)
}}
WITH audit_changes AS (
SELECT
audit_id,
table_name,
operation_type, -- I, U, D
primary_key_value,
changed_columns,
old_values,
new_values,
changed_by,
changed_at,
-- Parse the changed data
CASE
WHEN operation_type = 'I' THEN new_values
WHEN operation_type = 'D' THEN old_values
ELSE new_values
END as current_values
FROM {{ source('audit', 'inventory_audit') }}
WHERE table_name = 'inventory'
{% if is_incremental() %}
AND changed_at > (
SELECT COALESCE(MAX(changed_at), '1900-01-01'::timestamp)
FROM {{ this }}
)
{% endif %}
),
-- Extract inventory fields from JSON values
parsed_inventory AS (
SELECT
audit_id,
primary_key_value::INTEGER as inventory_id,
operation_type,
changed_by,
changed_at,
-- Extract fields from JSON
JSON_EXTRACT_SCALAR(current_values, '$.product_id')::INTEGER as product_id,
JSON_EXTRACT_SCALAR(current_values, '$.quantity_on_hand')::INTEGER as quantity_on_hand,
JSON_EXTRACT_SCALAR(current_values, '$.reserved_quantity')::INTEGER as reserved_quantity,
JSON_EXTRACT_SCALAR(current_values, '$.location_code') as location_code,
-- Track what changed
changed_columns
FROM audit_changes
)
SELECT * FROM parsed_inventory
8. Snapshot Comparison CDC
8.1 Full Snapshot Comparison
-- models/staging/stg_suppliers_snapshot_cdc.sql
{{
config(
materialized='incremental',
unique_key='supplier_id'
)
}}
{% if is_incremental() %}
WITH current_snapshot AS (
SELECT
supplier_id,
supplier_name,
contact_email,
phone_number,
address,
credit_rating,
CURRENT_DATE as snapshot_date,
{{ dbt_utils.generate_surrogate_key([
'supplier_name', 'contact_email', 'phone_number',
'address', 'credit_rating'
]) }} as content_hash
FROM {{ source('procurement', 'suppliers') }}
),
previous_snapshot AS (
SELECT
supplier_id,
content_hash as previous_content_hash
FROM {{ this }}
WHERE snapshot_date = (SELECT MAX(snapshot_date) FROM {{ this }})
),
-- Identify changes
changes AS (
SELECT
c.*,
p.previous_content_hash,
CASE
WHEN p.supplier_id IS NULL THEN 'INSERT'
WHEN c.content_hash != p.previous_content_hash THEN 'UPDATE'
ELSE 'NO_CHANGE'
END as change_type
FROM current_snapshot c
LEFT JOIN previous_snapshot p ON c.supplier_id = p.supplier_id
),
-- Also identify deletes
deletes AS (
SELECT
p.supplier_id,
NULL as supplier_name,
NULL as contact_email,
NULL as phone_number,
NULL as address,
NULL as credit_rating,
CURRENT_DATE as snapshot_date,
NULL as content_hash,
NULL as previous_content_hash,
'DELETE' as change_type
FROM previous_snapshot p
LEFT JOIN current_snapshot c ON p.supplier_id = c.supplier_id
WHERE c.supplier_id IS NULL
)
SELECT * FROM changes
WHERE change_type != 'NO_CHANGE'
UNION ALL
SELECT * FROM deletes
{% else %}
-- Initial load
SELECT
supplier_id,
supplier_name,
contact_email,
phone_number,
address,
credit_rating,
CURRENT_DATE as snapshot_date,
{{ dbt_utils.generate_surrogate_key([
'supplier_name', 'contact_email', 'phone_number',
'address', 'credit_rating'
]) }} as content_hash,
NULL as previous_content_hash,
'INITIAL_LOAD' as change_type
FROM {{ source('procurement', 'suppliers') }}
{% endif %}
9. SCD Integration with CDC
9.1 SCD Type 2 with CDC Processing
-- models/marts/dim_product_scd2_cdc.sql
{{
config(
materialized='incremental',
unique_key='surrogate_key',
on_schema_change='sync_all_columns'
)
}}
WITH cdc_changes AS (
SELECT *
FROM {{ ref('stg_products_timestamp_cdc') }}
{% if is_incremental() %}
WHERE processed_at > (
SELECT COALESCE(MAX(processed_at), '1900-01-01'::timestamp)
FROM {{ this }}
)
{% endif %}
),
-- Get current active records for comparison
{% if is_incremental() %}
current_active AS (
SELECT *
FROM {{ this }}
WHERE is_current = TRUE
),
{% endif %}
-- Process Type 2 changes
scd_processing AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['product_id', 'updated_at']) }} as surrogate_key,
product_id,
product_name,
category_id,
price,
updated_at as effective_start_date,
NULL as effective_end_date,
TRUE as is_current,
cdc_operation,
processed_at
FROM cdc_changes
WHERE cdc_operation != 'DELETE'
),
-- Handle record expiration for updates
{% if is_incremental() %}
expired_records AS (
SELECT
c.surrogate_key,
c.product_id,
c.product_name,
c.category_id,
c.price,
c.effective_start_date,
cc.updated_at as effective_end_date, -- Close the record
FALSE as is_current,
c.cdc_operation,
c.processed_at
FROM current_active c
INNER JOIN cdc_changes cc ON c.product_id = cc.product_id
WHERE cc.cdc_operation IN ('UPDATE', 'DELETE')
),
{% endif %}
-- Combine all changes
final_changes AS (
SELECT * FROM scd_processing
{% if is_incremental() %}
UNION ALL
SELECT * FROM expired_records
{% endif %}
)
SELECT * FROM final_changes
10. Real-Time CDC Processing
10.1 Micro-Batch Processing
-- models/staging/stg_events_microbatch_cdc.sql
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_timestamp',
batch_size='hour',
unique_key='event_id'
)
}}
WITH event_stream AS (
SELECT
event_id,
user_id,
event_type,
event_data,
event_timestamp,
ingested_at,
-- Add microbatch processing metadata
DATE_TRUNC('hour', event_timestamp) as batch_hour,
LAG(event_timestamp) OVER (
PARTITION BY user_id
ORDER BY event_timestamp
) as previous_event_time
FROM {{ source('events', 'user_events') }}
WHERE event_timestamp >= '{{ var("start_time") }}'
AND event_timestamp < '{{ var("end_time") }}'
),
-- Process changes within the microbatch
processed_events AS (
SELECT
*,
-- Calculate time between events
COALESCE(
EXTRACT(EPOCH FROM (event_timestamp - previous_event_time))/60,
0
) as minutes_since_last_event,
-- Detect session boundaries
CASE
WHEN previous_event_time IS NULL THEN 'SESSION_START'
WHEN EXTRACT(EPOCH FROM (event_timestamp - previous_event_time)) > 1800 THEN 'SESSION_START'
ELSE 'SESSION_CONTINUE'
END as session_indicator
FROM event_stream
)
SELECT * FROM processed_events
10.2 Real-Time Aggregation with CDC
-- models/marts/fct_user_activity_realtime.sql
{{
config(
materialized='incremental',
unique_key=['user_id', 'activity_hour'],
on_schema_change='sync_all_columns'
)
}}
WITH hourly_activity AS (
SELECT
user_id,
DATE_TRUNC('hour', event_timestamp) as activity_hour,
COUNT(*) as event_count,
COUNT(DISTINCT event_type) as unique_event_types,
MIN(event_timestamp) as first_event_time,
MAX(event_timestamp) as last_event_time,
-- Session metrics
COUNT(CASE WHEN session_indicator = 'SESSION_START' THEN 1 END) as new_sessions,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchase_events,
-- Real-time processing metadata
MAX(ingested_at) as last_processed_at,
CURRENT_TIMESTAMP as aggregation_timestamp
FROM {{ ref('stg_events_microbatch_cdc') }}
{% if is_incremental() %}
WHERE activity_hour > (
SELECT COALESCE(MAX(activity_hour), '1900-01-01'::timestamp)
FROM {{ this }}
)
{% endif %}
GROUP BY user_id, activity_hour
)
SELECT * FROM hourly_activity
11.1 Partition-Based CDC Processing
-- models/staging/stg_transactions_partitioned_cdc.sql
{{
config(
materialized='incremental',
unique_key='transaction_id',
partition_by={
'field': 'transaction_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['account_id', 'transaction_type']
)
}}
WITH partitioned_source AS (
SELECT
transaction_id,
account_id,
transaction_type,
amount,
currency,
transaction_timestamp,
DATE(transaction_timestamp) as transaction_date,
updated_at,
-- Partition metadata for optimization
DATE_TRUNC('month', transaction_timestamp) as partition_month
FROM {{ source('banking', 'transactions') }}
{% if is_incremental() %}
-- Leverage partition pruning
WHERE DATE(transaction_timestamp) >= CURRENT_DATE - INTERVAL '7 days'
AND updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp)
FROM {{ this }}
WHERE transaction_date >= CURRENT_DATE - INTERVAL '7 days'
)
{% endif %}
),
-- Add processing optimizations
optimized_processing AS (
SELECT
*,
-- Pre-calculate common aggregation keys
{{ dbt_utils.generate_surrogate_key(['account_id', 'transaction_date']) }} as daily_account_key,
-- Add change detection hash for efficiency
{{ dbt_utils.generate_surrogate_key([
'amount', 'currency', 'transaction_type'
]) }} as transaction_content_hash
FROM partitioned_source
)
SELECT * FROM optimized_processing
11.2 Incremental Strategy Optimization
-- macros/optimize_cdc_merge.sql
{% macro optimize_cdc_merge(target_relation, temp_relation, unique_key, dest_columns) %}
{% set merge_sql %}
MERGE {{ target_relation }} as target
USING (
SELECT * FROM {{ temp_relation }}
) as source
ON {{ unique_key_to_sql(unique_key, 'target', 'source') }}
WHEN MATCHED AND (
-- Only update if content actually changed
target.content_hash != source.content_hash OR
target.updated_at < source.updated_at
) THEN
UPDATE SET
{% for column in dest_columns -%}
{{ column.name }} = source.{{ column.name }}
{%- if not loop.last -%},{%- endif %}
{%- endfor %}
WHEN NOT MATCHED THEN
INSERT (
{%- for column in dest_columns -%}
{{ column.name }}
{%- if not loop.last -%},{%- endif %}
{%- endfor -%}
)
VALUES (
{%- for column in dest_columns -%}
source.{{ column.name }}
{%- if not loop.last -%},{%- endif %}
{%- endfor -%}
)
WHEN MATCHED AND source._operation_type = 'DELETE' THEN
DELETE
{% endset %}
{{ return(merge_sql) }}
{% endmacro %}
12. Monitoring and Alerting
12.1 CDC Processing Metrics
-- models/monitoring/cdc_processing_metrics.sql
WITH cdc_runs AS (
SELECT
model_name,
run_started_at,
run_completed_at,
status,
rows_affected,
execution_time_seconds,
-- Calculate processing lag
EXTRACT(EPOCH FROM (
run_completed_at -
(SELECT MAX(updated_at) FROM {{ ref('stg_source_table') }})
))/60 as processing_lag_minutes,
-- Detect anomalies
rows_affected - LAG(rows_affected, 1, 0) OVER (
PARTITION BY model_name
ORDER BY run_started_at
) as row_change_from_previous,
execution_time_seconds - LAG(execution_time_seconds, 1, 0) OVER (
PARTITION BY model_name
ORDER BY run_started_at
) as execution_time_change
FROM {{ source('dbt_metadata', 'model_runs') }}
WHERE model_name LIKE '%_cdc'
),
metrics_summary AS (
SELECT
model_name,
run_completed_at,
-- SLA metrics
CASE
WHEN processing_lag_minutes > 30 THEN 'SLA_BREACH'
WHEN processing_lag_minutes > 15 THEN 'SLA_WARNING'
ELSE 'SLA_OK'
END as sla_status,
-- Anomaly detection
CASE
WHEN ABS(row_change_from_previous) > (
AVG(ABS(row_change_from_previous)) OVER (
PARTITION BY model_name
ORDER BY run_started_at
ROWS BETWEEN 10 PRECEDING AND 1 PRECEDING
) * 3
) THEN 'ANOMALY_DETECTED'
ELSE 'NORMAL'
END as volume_anomaly,
processing_lag_minutes,
execution_time_seconds,
rows_affected
FROM cdc_runs
WHERE run_completed_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
)
SELECT * FROM metrics_summary
12.2 Data Quality Monitoring for CDC
-- tests/generic/test_cdc_data_quality.sql
-- Test for CDC data quality issues
SELECT
'duplicate_keys' as test_type,
COUNT(*) as failure_count,
'Found duplicate keys in CDC processing' as error_message
FROM (
SELECT {{ unique_key }}
FROM {{ ref(model) }}
WHERE {{ partition_filter if partition_filter else '1=1' }}
GROUP BY {{ unique_key }}
HAVING COUNT(*) > 1
)
UNION ALL
SELECT
'missing_timestamps' as test_type,
COUNT(*) as failure_count,
'Found records with missing CDC timestamps' as error_message
FROM {{ ref(model) }}
WHERE updated_at IS NULL
AND {{ partition_filter if partition_filter else '1=1' }}
UNION ALL
SELECT
'future_timestamps' as test_type,
COUNT(*) as failure_count,
'Found records with future timestamps' as error_message
FROM {{ ref(model) }}
WHERE updated_at > CURRENT_TIMESTAMP + INTERVAL '1 hour'
AND {{ partition_filter if partition_filter else '1=1' }}
UNION ALL
SELECT
'sequence_gaps' as test_type,
COUNT(*) as failure_count,
'Found gaps in sequence numbers' as error_message
FROM (
SELECT
sequence_number,
LAG(sequence_number) OVER (ORDER BY sequence_number) as prev_seq
FROM {{ ref(model) }}
WHERE {{ partition_filter if partition_filter else '1=1' }}
)
WHERE sequence_number - prev_seq > 1
13. Troubleshooting Guide
Common CDC Issues and Solutions
1. Missing Changes
- Cause: Incorrect timestamp filtering or timezone issues
- Solution: Ensure consistent timezone handling and inclusive time ranges
- Prevention: Use UTC timestamps and add buffer time to filters
2. Duplicate Processing
- Cause: Non-deterministic incremental logic or concurrent runs
- Solution: Implement proper deduplication and run locks
- Prevention: Use unique keys and sequence numbers
3. Performance Degradation
- Cause: Large incremental windows or missing indexes
- Solution: Optimize partition strategy and add appropriate indexes
- Prevention: Monitor processing volumes and execution times
4. Data Consistency Issues
- Cause: Out-of-order processing or incomplete transactions
- Solution: Implement proper ordering and transaction boundaries
- Prevention: Use sequence numbers and transaction IDs
14. Code Examples
For detailed implementation examples and ready-to-use code templates, see the comprehensive code examples in the repository:
📁 Code Examples Repository
Access all CDC implementation examples and templates:
15. Best Practices
15.1 Design Principles
- Idempotency: Ensure CDC processes can be safely re-run
- Monotonic Processing: Process changes in chronological order
- Complete Transactions: Handle transaction boundaries properly
- Schema Evolution: Plan for source schema changes
- Error Handling: Implement robust error recovery mechanisms
15.2 Implementation Guidelines
-- CDC Best Practices Checklist
1. ✅ Use unique keys for deduplication
2. ✅ Implement proper timestamp handling (UTC)
3. ✅ Add sequence numbers for ordering
4. ✅ Include operation type metadata
5. ✅ Handle deletes explicitly
6. ✅ Monitor processing lag and volumes
7. ✅ Test with edge cases (late arrivals, duplicates)
8. ✅ Document data lineage and dependencies
9. ✅ Implement circuit breakers for large volumes
10. ✅ Plan for disaster recovery scenarios
15.3 Production Readiness
Category |
Requirement |
Implementation |
Monitoring |
Processing lag alerts |
SLA-based alerting on processing delays |
Data Quality |
Automated testing |
dbt tests for duplicates, gaps, consistency |
Performance |
Scalable architecture |
Partitioning, clustering, micro-batches |
Recovery |
Backfill capability |
Historical reprocessing workflows |
Documentation |
Operational runbooks |
Troubleshooting guides and escalation |
16. Advanced Patterns
16.1 Multi-Source CDC Federation
-- models/staging/stg_customer_360_cdc.sql
-- Federate CDC from multiple customer data sources
WITH crm_changes AS (
SELECT
'CRM' as source_system,
customer_id,
email,
phone,
NULL as billing_address,
updated_at,
_operation_type
FROM {{ ref('stg_crm_customers_cdc') }}
),
billing_changes AS (
SELECT
'BILLING' as source_system,
customer_id,
NULL as email,
NULL as phone,
billing_address,
updated_at,
_operation_type
FROM {{ ref('stg_billing_customers_cdc') }}
),
-- Merge and resolve conflicts
federated_changes AS (
SELECT * FROM crm_changes
UNION ALL
SELECT * FROM billing_changes
),
-- Apply precedence rules (CRM wins for contact info, Billing for address)
resolved_changes AS (
SELECT
customer_id,
FIRST_VALUE(email) OVER (
PARTITION BY customer_id
ORDER BY CASE WHEN source_system = 'CRM' THEN 0 ELSE 1 END, updated_at DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as email,
FIRST_VALUE(phone) OVER (
PARTITION BY customer_id
ORDER BY CASE WHEN source_system = 'CRM' THEN 0 ELSE 1 END, updated_at DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as phone,
FIRST_VALUE(billing_address) OVER (
PARTITION BY customer_id
ORDER BY CASE WHEN source_system = 'BILLING' THEN 0 ELSE 1 END, updated_at DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as billing_address,
MAX(updated_at) as last_updated_at
FROM federated_changes
GROUP BY customer_id
)
SELECT * FROM resolved_changes
16.2 Event Sourcing with CDC
-- models/marts/customer_event_store.sql
-- Implement event sourcing pattern with CDC
{{
config(
materialized='incremental',
unique_key='event_id',
order_by='event_timestamp'
)
}}
WITH source_events AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['customer_id', 'updated_at', 'source_system']) }} as event_id,
customer_id,
'CUSTOMER_UPDATED' as event_type,
updated_at as event_timestamp,
-- Event payload with before/after state
JSON_BUILD_OBJECT(
'before', JSON_BUILD_OBJECT(
'email', LAG(email) OVER (PARTITION BY customer_id ORDER BY updated_at),
'phone', LAG(phone) OVER (PARTITION BY customer_id ORDER BY updated_at),
'status', LAG(status) OVER (PARTITION BY customer_id ORDER BY updated_at)
),
'after', JSON_BUILD_OBJECT(
'email', email,
'phone', phone,
'status', status
),
'source_system', source_system,
'operation_type', _operation_type
) as event_payload,
-- Event metadata
source_system,
_operation_type,
ROW_NUMBER() OVER (ORDER BY updated_at) as global_sequence_number
FROM {{ ref('stg_customer_360_cdc') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT COALESCE(MAX(event_timestamp), '1900-01-01'::timestamp) FROM {{ this }})
{% endif %}
)
SELECT * FROM source_events
🎯 Summary
This comprehensive guide provides everything needed to implement robust Change Data Capture patterns in dbt. From basic timestamp-based approaches to advanced event sourcing architectures, these patterns enable efficient, scalable data synchronization and historical tracking.
Key Takeaways:
- CDC vs Materialization: CDC identifies what changed (data pattern), materialization strategies control how to apply changes (SQL execution)
- Choose the appropriate CDC pattern based on your data sources and requirements
- Implement proper deduplication, ordering, and conflict resolution
- Monitor processing performance and data quality continuously
- Plan for schema evolution and operational complexities
- Test thoroughly with real-world scenarios and edge cases
Official dbt Documentation: For the most up-to-date information on dbt incremental models and CDC patterns, visit dbt Incremental Models and dbt Incremental Strategies.