dbt Change Data Capture (CDC): Complete Implementation Guide

This comprehensive guide covers implementing Change Data Capture (CDC) patterns in dbt for tracking and processing data changes over time. Learn to build robust, efficient CDC solutions for real-time data synchronization and historical tracking.

Table of Contents

1. Introduction to CDC in dbt

Change Data Capture (CDC) is a pattern used to identify and track changes in data over time. In dbt, CDC enables efficient data synchronization, historical tracking, and real-time analytics by processing only changed records rather than entire datasets.

Key Benefits:

2. CDC vs dbt Materialization Strategies

It's crucial to understand the distinction between Change Data Capture (CDC) as a data integration pattern and dbt's incremental materialization strategies like "merge". While they work together, they serve different purposes:

2.1 Change Data Capture (CDC) - Data Integration Pattern

CDC is a data integration pattern focused on identifying, capturing, and processing data changes from source systems:

2.2 dbt Materialization Strategies - Data Processing Method

dbt materialization strategies determine how dbt physically applies changes to target tables:

2.3 How They Work Together

Aspect CDC Pattern dbt Materialization Combined Example
Data Source Captures changes from OLTP systems Processes data from staging tables CDC captures database logs → dbt processes with merge strategy
Change Detection Identifies what changed and when Uses CDC output to determine actions CDC identifies updated orders → merge strategy updates dimension table
Metadata Handling Adds operation types, timestamps, sequences Uses metadata for conditional logic CDC adds _operation_type → merge uses it for INSERT/UPDATE/DELETE
Performance Reduces data volume by filtering changes Optimizes warehouse operations CDC processes 1000 changed records → merge updates only those rows

2.4 Practical Example: CDC + Merge Strategy

-- Step 1: CDC Pattern captures changes from source -- models/staging/stg_customers_cdc.sql WITH cdc_changes AS ( SELECT customer_id, customer_name, email, updated_at, _operation_type, -- CDC metadata: INSERT, UPDATE, DELETE _sequence_number -- CDC ordering FROM {{ source('cdc_logs', 'customer_changes') }} WHERE updated_at > '{{ var("last_processed_timestamp") }}' ), -- Step 2: dbt Merge Strategy applies changes to target -- models/marts/dim_customers.sql {{ config( materialized='incremental', incremental_strategy='merge', -- dbt materialization strategy unique_key='customer_id' ) }} SELECT customer_id, customer_name, email, updated_at, CASE WHEN _operation_type = 'DELETE' THEN TRUE ELSE FALSE END as is_deleted FROM {{ ref('stg_customers_cdc') }} -- The merge strategy will: -- - INSERT new customer_ids -- - UPDATE existing customer_ids -- - Handle DELETEs based on CDC metadata

2.5 Key Distinctions Summary

Think of it this way:

Common Misconception:

The "merge" incremental strategy is not CDC itself. It's a SQL operation (MERGE statement) that dbt uses to apply changes. CDC is the broader pattern of identifying and processing those changes in the first place.

3. CDC Fundamentals

3.1 CDC Types

CDC Type Description Use Case Complexity
Timestamp-Based Uses modification timestamps Simple incremental loads Low
Log-Based Reads database transaction logs Real-time replication High
Trigger-Based Database triggers capture changes Detailed audit trails Medium
Snapshot Comparison Compares full snapshots Batch processing Medium

3.2 CDC Architecture in dbt

-- CDC Architecture Components Source System → CDC Capture → Staging Area → dbt Processing → Target Tables ↓ ↓ ↓ ↓ ↓ Operational Change Log Raw Changes Transformed Final Data Database Extraction Staging Changes Warehouse

4. CDC Implementation Patterns

4.1 Incremental Model Pattern

-- models/staging/stg_orders_cdc.sql {{ config( materialized='incremental', unique_key='order_id', on_schema_change='sync_all_columns' ) }} WITH source_data AS ( SELECT order_id, customer_id, order_status, order_date, updated_at, _operation_type, _sequence_number FROM {{ ref('raw_orders_cdc') }} {% if is_incremental() %} WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }}) {% endif %} ), -- Deduplicate based on sequence number (latest change wins) deduplicated AS ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY order_id ORDER BY _sequence_number DESC ) as rn FROM source_data ) SELECT * FROM deduplicated WHERE rn = 1

4.2 SCD Type 2 with CDC

-- models/marts/dim_customer_scd2.sql {{ config( materialized='incremental', unique_key='surrogate_key' ) }} WITH source_changes AS ( SELECT customer_id, customer_name, email, address, phone, updated_at, _operation_type, _sequence_number FROM {{ ref('stg_customers_cdc') }} {% if is_incremental() %} WHERE updated_at > (SELECT MAX(effective_end_date) FROM {{ this }} WHERE effective_end_date IS NOT NULL) OR updated_at > (SELECT MAX(effective_start_date) FROM {{ this }} WHERE effective_end_date IS NULL) {% endif %} ), -- Generate surrogate keys and validity periods scd_logic AS ( SELECT {{ dbt_utils.generate_surrogate_key(['customer_id', 'updated_at']) }} as surrogate_key, customer_id, customer_name, email, address, phone, updated_at as effective_start_date, CASE WHEN _operation_type = 'DELETE' THEN updated_at ELSE NULL END as effective_end_date, _operation_type, CASE WHEN _operation_type = 'DELETE' THEN FALSE ELSE TRUE END as is_current FROM source_changes ) SELECT * FROM scd_logic

5. Timestamp-Based CDC

5.1 Basic Timestamp CDC

-- models/staging/stg_products_timestamp_cdc.sql {{ config( materialized='incremental', unique_key='product_id' ) }} SELECT product_id, product_name, category_id, price, created_at, updated_at, -- Add CDC metadata CURRENT_TIMESTAMP as processed_at, 'UPSERT' as cdc_operation FROM {{ source('ecommerce', 'products') }} {% if is_incremental() %} -- Only process records updated since last run WHERE updated_at > ( SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp) FROM {{ this }} ) {% endif %}

5.2 Advanced Timestamp CDC with Deletes

-- models/staging/stg_customers_advanced_cdc.sql {{ config( materialized='incremental', unique_key='customer_id' ) }} WITH current_data AS ( SELECT customer_id, customer_name, email, status, updated_at, FALSE as is_deleted FROM {{ source('crm', 'customers') }} WHERE status != 'DELETED' {% if is_incremental() %} AND updated_at > ( SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp) FROM {{ this }} WHERE NOT is_deleted ) {% endif %} ), deleted_data AS ( SELECT customer_id, customer_name, email, status, updated_at, TRUE as is_deleted FROM {{ source('crm', 'customers') }} WHERE status = 'DELETED' {% if is_incremental() %} AND updated_at > ( SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp) FROM {{ this }} WHERE is_deleted ) {% endif %} ) SELECT * FROM current_data UNION ALL SELECT * FROM deleted_data

6. Log-Based CDC

6.1 Processing WAL/Binlog Data

-- models/staging/stg_orders_log_based_cdc.sql {{ config( materialized='incremental', unique_key=['order_id', 'log_sequence_number'] ) }} WITH log_entries AS ( SELECT -- Parse log entry data JSON_EXTRACT_SCALAR(log_data, '$.order_id') as order_id, JSON_EXTRACT_SCALAR(log_data, '$.customer_id') as customer_id, JSON_EXTRACT_SCALAR(log_data, '$.order_status') as order_status, JSON_EXTRACT_SCALAR(log_data, '$.total_amount') as total_amount, -- CDC metadata from log operation_type, -- INSERT, UPDATE, DELETE log_sequence_number, transaction_id, commit_timestamp, -- Row change metadata CASE operation_type WHEN 'UPDATE' THEN JSON_EXTRACT(log_data, '$.before') ELSE NULL END as before_values, CASE operation_type WHEN 'DELETE' THEN JSON_EXTRACT(log_data, '$.before') ELSE JSON_EXTRACT(log_data, '$.after') END as after_values FROM {{ source('cdc_logs', 'order_changes') }} {% if is_incremental() %} WHERE log_sequence_number > ( SELECT COALESCE(MAX(log_sequence_number), 0) FROM {{ this }} ) {% endif %} ), -- Process changes with conflict resolution processed_changes AS ( SELECT *, -- Determine final operation after deduplication LAST_VALUE(operation_type) OVER ( PARTITION BY order_id ORDER BY log_sequence_number ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) as final_operation, -- Mark as latest change for this key ROW_NUMBER() OVER ( PARTITION BY order_id ORDER BY log_sequence_number DESC ) = 1 as is_latest_change FROM log_entries ) SELECT * FROM processed_changes

6.2 Log-Based CDC Deduplication

-- models/marts/fct_orders_current_state.sql -- Reconstruct current state from log entries WITH latest_operations AS ( SELECT order_id, customer_id, order_status, total_amount, final_operation, commit_timestamp, -- Only keep the final state per order ROW_NUMBER() OVER ( PARTITION BY order_id ORDER BY log_sequence_number DESC ) = 1 as is_current_state FROM {{ ref('stg_orders_log_based_cdc') }} WHERE is_latest_change ), current_orders AS ( SELECT order_id, customer_id, order_status, total_amount, commit_timestamp as last_updated_at FROM latest_operations WHERE is_current_state AND final_operation != 'DELETE' -- Exclude deleted records ) SELECT * FROM current_orders

7. Trigger-Based CDC

7.1 Processing Audit Table Data

-- models/staging/stg_inventory_audit_cdc.sql {{ config( materialized='incremental', unique_key='audit_id' ) }} WITH audit_changes AS ( SELECT audit_id, table_name, operation_type, -- I, U, D primary_key_value, changed_columns, old_values, new_values, changed_by, changed_at, -- Parse the changed data CASE WHEN operation_type = 'I' THEN new_values WHEN operation_type = 'D' THEN old_values ELSE new_values END as current_values FROM {{ source('audit', 'inventory_audit') }} WHERE table_name = 'inventory' {% if is_incremental() %} AND changed_at > ( SELECT COALESCE(MAX(changed_at), '1900-01-01'::timestamp) FROM {{ this }} ) {% endif %} ), -- Extract inventory fields from JSON values parsed_inventory AS ( SELECT audit_id, primary_key_value::INTEGER as inventory_id, operation_type, changed_by, changed_at, -- Extract fields from JSON JSON_EXTRACT_SCALAR(current_values, '$.product_id')::INTEGER as product_id, JSON_EXTRACT_SCALAR(current_values, '$.quantity_on_hand')::INTEGER as quantity_on_hand, JSON_EXTRACT_SCALAR(current_values, '$.reserved_quantity')::INTEGER as reserved_quantity, JSON_EXTRACT_SCALAR(current_values, '$.location_code') as location_code, -- Track what changed changed_columns FROM audit_changes ) SELECT * FROM parsed_inventory

8. Snapshot Comparison CDC

8.1 Full Snapshot Comparison

-- models/staging/stg_suppliers_snapshot_cdc.sql {{ config( materialized='incremental', unique_key='supplier_id' ) }} {% if is_incremental() %} WITH current_snapshot AS ( SELECT supplier_id, supplier_name, contact_email, phone_number, address, credit_rating, CURRENT_DATE as snapshot_date, {{ dbt_utils.generate_surrogate_key([ 'supplier_name', 'contact_email', 'phone_number', 'address', 'credit_rating' ]) }} as content_hash FROM {{ source('procurement', 'suppliers') }} ), previous_snapshot AS ( SELECT supplier_id, content_hash as previous_content_hash FROM {{ this }} WHERE snapshot_date = (SELECT MAX(snapshot_date) FROM {{ this }}) ), -- Identify changes changes AS ( SELECT c.*, p.previous_content_hash, CASE WHEN p.supplier_id IS NULL THEN 'INSERT' WHEN c.content_hash != p.previous_content_hash THEN 'UPDATE' ELSE 'NO_CHANGE' END as change_type FROM current_snapshot c LEFT JOIN previous_snapshot p ON c.supplier_id = p.supplier_id ), -- Also identify deletes deletes AS ( SELECT p.supplier_id, NULL as supplier_name, NULL as contact_email, NULL as phone_number, NULL as address, NULL as credit_rating, CURRENT_DATE as snapshot_date, NULL as content_hash, NULL as previous_content_hash, 'DELETE' as change_type FROM previous_snapshot p LEFT JOIN current_snapshot c ON p.supplier_id = c.supplier_id WHERE c.supplier_id IS NULL ) SELECT * FROM changes WHERE change_type != 'NO_CHANGE' UNION ALL SELECT * FROM deletes {% else %} -- Initial load SELECT supplier_id, supplier_name, contact_email, phone_number, address, credit_rating, CURRENT_DATE as snapshot_date, {{ dbt_utils.generate_surrogate_key([ 'supplier_name', 'contact_email', 'phone_number', 'address', 'credit_rating' ]) }} as content_hash, NULL as previous_content_hash, 'INITIAL_LOAD' as change_type FROM {{ source('procurement', 'suppliers') }} {% endif %}

9. SCD Integration with CDC

9.1 SCD Type 2 with CDC Processing

-- models/marts/dim_product_scd2_cdc.sql {{ config( materialized='incremental', unique_key='surrogate_key', on_schema_change='sync_all_columns' ) }} WITH cdc_changes AS ( SELECT * FROM {{ ref('stg_products_timestamp_cdc') }} {% if is_incremental() %} WHERE processed_at > ( SELECT COALESCE(MAX(processed_at), '1900-01-01'::timestamp) FROM {{ this }} ) {% endif %} ), -- Get current active records for comparison {% if is_incremental() %} current_active AS ( SELECT * FROM {{ this }} WHERE is_current = TRUE ), {% endif %} -- Process Type 2 changes scd_processing AS ( SELECT {{ dbt_utils.generate_surrogate_key(['product_id', 'updated_at']) }} as surrogate_key, product_id, product_name, category_id, price, updated_at as effective_start_date, NULL as effective_end_date, TRUE as is_current, cdc_operation, processed_at FROM cdc_changes WHERE cdc_operation != 'DELETE' ), -- Handle record expiration for updates {% if is_incremental() %} expired_records AS ( SELECT c.surrogate_key, c.product_id, c.product_name, c.category_id, c.price, c.effective_start_date, cc.updated_at as effective_end_date, -- Close the record FALSE as is_current, c.cdc_operation, c.processed_at FROM current_active c INNER JOIN cdc_changes cc ON c.product_id = cc.product_id WHERE cc.cdc_operation IN ('UPDATE', 'DELETE') ), {% endif %} -- Combine all changes final_changes AS ( SELECT * FROM scd_processing {% if is_incremental() %} UNION ALL SELECT * FROM expired_records {% endif %} ) SELECT * FROM final_changes

10. Real-Time CDC Processing

10.1 Micro-Batch Processing

-- models/staging/stg_events_microbatch_cdc.sql {{ config( materialized='incremental', incremental_strategy='microbatch', event_time='event_timestamp', batch_size='hour', unique_key='event_id' ) }} WITH event_stream AS ( SELECT event_id, user_id, event_type, event_data, event_timestamp, ingested_at, -- Add microbatch processing metadata DATE_TRUNC('hour', event_timestamp) as batch_hour, LAG(event_timestamp) OVER ( PARTITION BY user_id ORDER BY event_timestamp ) as previous_event_time FROM {{ source('events', 'user_events') }} WHERE event_timestamp >= '{{ var("start_time") }}' AND event_timestamp < '{{ var("end_time") }}' ), -- Process changes within the microbatch processed_events AS ( SELECT *, -- Calculate time between events COALESCE( EXTRACT(EPOCH FROM (event_timestamp - previous_event_time))/60, 0 ) as minutes_since_last_event, -- Detect session boundaries CASE WHEN previous_event_time IS NULL THEN 'SESSION_START' WHEN EXTRACT(EPOCH FROM (event_timestamp - previous_event_time)) > 1800 THEN 'SESSION_START' ELSE 'SESSION_CONTINUE' END as session_indicator FROM event_stream ) SELECT * FROM processed_events

10.2 Real-Time Aggregation with CDC

-- models/marts/fct_user_activity_realtime.sql {{ config( materialized='incremental', unique_key=['user_id', 'activity_hour'], on_schema_change='sync_all_columns' ) }} WITH hourly_activity AS ( SELECT user_id, DATE_TRUNC('hour', event_timestamp) as activity_hour, COUNT(*) as event_count, COUNT(DISTINCT event_type) as unique_event_types, MIN(event_timestamp) as first_event_time, MAX(event_timestamp) as last_event_time, -- Session metrics COUNT(CASE WHEN session_indicator = 'SESSION_START' THEN 1 END) as new_sessions, SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchase_events, -- Real-time processing metadata MAX(ingested_at) as last_processed_at, CURRENT_TIMESTAMP as aggregation_timestamp FROM {{ ref('stg_events_microbatch_cdc') }} {% if is_incremental() %} WHERE activity_hour > ( SELECT COALESCE(MAX(activity_hour), '1900-01-01'::timestamp) FROM {{ this }} ) {% endif %} GROUP BY user_id, activity_hour ) SELECT * FROM hourly_activity

11. Performance Optimization

11.1 Partition-Based CDC Processing

-- models/staging/stg_transactions_partitioned_cdc.sql {{ config( materialized='incremental', unique_key='transaction_id', partition_by={ 'field': 'transaction_date', 'data_type': 'date', 'granularity': 'day' }, cluster_by=['account_id', 'transaction_type'] ) }} WITH partitioned_source AS ( SELECT transaction_id, account_id, transaction_type, amount, currency, transaction_timestamp, DATE(transaction_timestamp) as transaction_date, updated_at, -- Partition metadata for optimization DATE_TRUNC('month', transaction_timestamp) as partition_month FROM {{ source('banking', 'transactions') }} {% if is_incremental() %} -- Leverage partition pruning WHERE DATE(transaction_timestamp) >= CURRENT_DATE - INTERVAL '7 days' AND updated_at > ( SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp) FROM {{ this }} WHERE transaction_date >= CURRENT_DATE - INTERVAL '7 days' ) {% endif %} ), -- Add processing optimizations optimized_processing AS ( SELECT *, -- Pre-calculate common aggregation keys {{ dbt_utils.generate_surrogate_key(['account_id', 'transaction_date']) }} as daily_account_key, -- Add change detection hash for efficiency {{ dbt_utils.generate_surrogate_key([ 'amount', 'currency', 'transaction_type' ]) }} as transaction_content_hash FROM partitioned_source ) SELECT * FROM optimized_processing

11.2 Incremental Strategy Optimization

-- macros/optimize_cdc_merge.sql {% macro optimize_cdc_merge(target_relation, temp_relation, unique_key, dest_columns) %} {% set merge_sql %} MERGE {{ target_relation }} as target USING ( SELECT * FROM {{ temp_relation }} ) as source ON {{ unique_key_to_sql(unique_key, 'target', 'source') }} WHEN MATCHED AND ( -- Only update if content actually changed target.content_hash != source.content_hash OR target.updated_at < source.updated_at ) THEN UPDATE SET {% for column in dest_columns -%} {{ column.name }} = source.{{ column.name }} {%- if not loop.last -%},{%- endif %} {%- endfor %} WHEN NOT MATCHED THEN INSERT ( {%- for column in dest_columns -%} {{ column.name }} {%- if not loop.last -%},{%- endif %} {%- endfor -%} ) VALUES ( {%- for column in dest_columns -%} source.{{ column.name }} {%- if not loop.last -%},{%- endif %} {%- endfor -%} ) WHEN MATCHED AND source._operation_type = 'DELETE' THEN DELETE {% endset %} {{ return(merge_sql) }} {% endmacro %}

12. Monitoring and Alerting

12.1 CDC Processing Metrics

-- models/monitoring/cdc_processing_metrics.sql WITH cdc_runs AS ( SELECT model_name, run_started_at, run_completed_at, status, rows_affected, execution_time_seconds, -- Calculate processing lag EXTRACT(EPOCH FROM ( run_completed_at - (SELECT MAX(updated_at) FROM {{ ref('stg_source_table') }}) ))/60 as processing_lag_minutes, -- Detect anomalies rows_affected - LAG(rows_affected, 1, 0) OVER ( PARTITION BY model_name ORDER BY run_started_at ) as row_change_from_previous, execution_time_seconds - LAG(execution_time_seconds, 1, 0) OVER ( PARTITION BY model_name ORDER BY run_started_at ) as execution_time_change FROM {{ source('dbt_metadata', 'model_runs') }} WHERE model_name LIKE '%_cdc' ), metrics_summary AS ( SELECT model_name, run_completed_at, -- SLA metrics CASE WHEN processing_lag_minutes > 30 THEN 'SLA_BREACH' WHEN processing_lag_minutes > 15 THEN 'SLA_WARNING' ELSE 'SLA_OK' END as sla_status, -- Anomaly detection CASE WHEN ABS(row_change_from_previous) > ( AVG(ABS(row_change_from_previous)) OVER ( PARTITION BY model_name ORDER BY run_started_at ROWS BETWEEN 10 PRECEDING AND 1 PRECEDING ) * 3 ) THEN 'ANOMALY_DETECTED' ELSE 'NORMAL' END as volume_anomaly, processing_lag_minutes, execution_time_seconds, rows_affected FROM cdc_runs WHERE run_completed_at >= CURRENT_TIMESTAMP - INTERVAL '24 hours' ) SELECT * FROM metrics_summary

12.2 Data Quality Monitoring for CDC

-- tests/generic/test_cdc_data_quality.sql -- Test for CDC data quality issues SELECT 'duplicate_keys' as test_type, COUNT(*) as failure_count, 'Found duplicate keys in CDC processing' as error_message FROM ( SELECT {{ unique_key }} FROM {{ ref(model) }} WHERE {{ partition_filter if partition_filter else '1=1' }} GROUP BY {{ unique_key }} HAVING COUNT(*) > 1 ) UNION ALL SELECT 'missing_timestamps' as test_type, COUNT(*) as failure_count, 'Found records with missing CDC timestamps' as error_message FROM {{ ref(model) }} WHERE updated_at IS NULL AND {{ partition_filter if partition_filter else '1=1' }} UNION ALL SELECT 'future_timestamps' as test_type, COUNT(*) as failure_count, 'Found records with future timestamps' as error_message FROM {{ ref(model) }} WHERE updated_at > CURRENT_TIMESTAMP + INTERVAL '1 hour' AND {{ partition_filter if partition_filter else '1=1' }} UNION ALL SELECT 'sequence_gaps' as test_type, COUNT(*) as failure_count, 'Found gaps in sequence numbers' as error_message FROM ( SELECT sequence_number, LAG(sequence_number) OVER (ORDER BY sequence_number) as prev_seq FROM {{ ref(model) }} WHERE {{ partition_filter if partition_filter else '1=1' }} ) WHERE sequence_number - prev_seq > 1

13. Troubleshooting Guide

Common CDC Issues and Solutions

1. Missing Changes

2. Duplicate Processing

3. Performance Degradation

4. Data Consistency Issues

14. Code Examples

For detailed implementation examples and ready-to-use code templates, see the comprehensive code examples in the repository:

📁 Code Examples Repository

Access all CDC implementation examples and templates:

15. Best Practices

15.1 Design Principles

15.2 Implementation Guidelines

-- CDC Best Practices Checklist 1. ✅ Use unique keys for deduplication 2. ✅ Implement proper timestamp handling (UTC) 3. ✅ Add sequence numbers for ordering 4. ✅ Include operation type metadata 5. ✅ Handle deletes explicitly 6. ✅ Monitor processing lag and volumes 7. ✅ Test with edge cases (late arrivals, duplicates) 8. ✅ Document data lineage and dependencies 9. ✅ Implement circuit breakers for large volumes 10. ✅ Plan for disaster recovery scenarios

15.3 Production Readiness

Category Requirement Implementation
Monitoring Processing lag alerts SLA-based alerting on processing delays
Data Quality Automated testing dbt tests for duplicates, gaps, consistency
Performance Scalable architecture Partitioning, clustering, micro-batches
Recovery Backfill capability Historical reprocessing workflows
Documentation Operational runbooks Troubleshooting guides and escalation

16. Advanced Patterns

16.1 Multi-Source CDC Federation

-- models/staging/stg_customer_360_cdc.sql -- Federate CDC from multiple customer data sources WITH crm_changes AS ( SELECT 'CRM' as source_system, customer_id, email, phone, NULL as billing_address, updated_at, _operation_type FROM {{ ref('stg_crm_customers_cdc') }} ), billing_changes AS ( SELECT 'BILLING' as source_system, customer_id, NULL as email, NULL as phone, billing_address, updated_at, _operation_type FROM {{ ref('stg_billing_customers_cdc') }} ), -- Merge and resolve conflicts federated_changes AS ( SELECT * FROM crm_changes UNION ALL SELECT * FROM billing_changes ), -- Apply precedence rules (CRM wins for contact info, Billing for address) resolved_changes AS ( SELECT customer_id, FIRST_VALUE(email) OVER ( PARTITION BY customer_id ORDER BY CASE WHEN source_system = 'CRM' THEN 0 ELSE 1 END, updated_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) as email, FIRST_VALUE(phone) OVER ( PARTITION BY customer_id ORDER BY CASE WHEN source_system = 'CRM' THEN 0 ELSE 1 END, updated_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) as phone, FIRST_VALUE(billing_address) OVER ( PARTITION BY customer_id ORDER BY CASE WHEN source_system = 'BILLING' THEN 0 ELSE 1 END, updated_at DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) as billing_address, MAX(updated_at) as last_updated_at FROM federated_changes GROUP BY customer_id ) SELECT * FROM resolved_changes

16.2 Event Sourcing with CDC

-- models/marts/customer_event_store.sql -- Implement event sourcing pattern with CDC {{ config( materialized='incremental', unique_key='event_id', order_by='event_timestamp' ) }} WITH source_events AS ( SELECT {{ dbt_utils.generate_surrogate_key(['customer_id', 'updated_at', 'source_system']) }} as event_id, customer_id, 'CUSTOMER_UPDATED' as event_type, updated_at as event_timestamp, -- Event payload with before/after state JSON_BUILD_OBJECT( 'before', JSON_BUILD_OBJECT( 'email', LAG(email) OVER (PARTITION BY customer_id ORDER BY updated_at), 'phone', LAG(phone) OVER (PARTITION BY customer_id ORDER BY updated_at), 'status', LAG(status) OVER (PARTITION BY customer_id ORDER BY updated_at) ), 'after', JSON_BUILD_OBJECT( 'email', email, 'phone', phone, 'status', status ), 'source_system', source_system, 'operation_type', _operation_type ) as event_payload, -- Event metadata source_system, _operation_type, ROW_NUMBER() OVER (ORDER BY updated_at) as global_sequence_number FROM {{ ref('stg_customer_360_cdc') }} {% if is_incremental() %} WHERE updated_at > (SELECT COALESCE(MAX(event_timestamp), '1900-01-01'::timestamp) FROM {{ this }}) {% endif %} ) SELECT * FROM source_events

🎯 Summary

This comprehensive guide provides everything needed to implement robust Change Data Capture patterns in dbt. From basic timestamp-based approaches to advanced event sourcing architectures, these patterns enable efficient, scalable data synchronization and historical tracking.

Key Takeaways:


Official dbt Documentation: For the most up-to-date information on dbt incremental models and CDC patterns, visit dbt Incremental Models and dbt Incremental Strategies.