Full-table reloads work fine until they don't — and when they break, they break at 3 AM during a product launch. This guide walks you through replacing that TRUNCATE + INSERT job with a log-based CDC incremental ELT pipeline that handles deletes, survives schema drift, and ships to your warehouse in under 60 seconds of end-to-end latency.

We cover every step from enabling WAL replication on Postgres to writing idempotent dbt MERGE models — including the three production gotchas the other tutorials skip.


TL;DR

A production-grade incremental ELT pipeline CDC stack reads database change events directly from the write-ahead log (WAL), lands raw events in a bronze layer, and applies an idempotent MERGE into silver/gold tables. The key pieces are Debezium 2.6 (log-based capture), Kafka as the event buffer, and dbt-core 1.8 incremental models with unique_key for upserts. You get sub-minute latency, delete awareness, and zero full-table locks on your source database. The cost is operational complexity — Kafka Connect clusters don't manage themselves.


Why Full Reloads Will Eventually Break You

A TRUNCATE + INSERT pipeline is conceptually simple: blow away yesterday's data, reload everything. It works at 10 GB. At 500 GB it starts to strain. At 5 TB it becomes the single largest cost line in your Snowflake bill, and that SELECT * you're running against production every hour is the reason your backend team filed three incidents last quarter.

The failure modes are predictable:

Log-based CDC solves all four by reading database transaction events rather than querying the table itself. The DataBasin platform ships all three layers of this architecture — capture, transform, and reporting — as a single managed service if you'd rather skip the operational work.


Architecture Overview

Here is the full event flow from source write to warehouse query:

flowchart LR
    subgraph Source["Source DB"]
        PG[(PostgreSQL\nWAL / binlog)]
    end

    subgraph Capture["Change Capture"]
        DEB[Debezium 2.6\nKafka Connect]
    end

    subgraph Streaming["Event Bus"]
        KF[Kafka\nTopic per table]
    end

    subgraph Landing["Bronze Layer\n(raw events)"]
        BRZ[(Object Storage\nor Raw Table)]
    end

    subgraph Transform["dbt 1.8"]
        INC[Incremental Model\nMERGE / UPSERT]
    end

    subgraph Warehouse["Warehouse"]
        SIL[(Silver\nclean + deduped)]
        GLD[(Gold\nagg + business logic)]
    end

    PG -->|WAL stream| DEB
    DEB -->|JSON events| KF
    KF -->|Kafka Connect\nSink| BRZ
    BRZ -->|dbt source| INC
    INC --> SIL
    SIL --> GLD

Where each component does its job:

Layer Tool Responsibility
Capture Debezium 2.6 Tail the WAL; emit structured before/after JSON
Transport Kafka Buffer events; decouple source throughput from sink speed
Landing S3 / GCS / Snowflake stage Raw immutable log; replay point if downstream fails
Transform dbt-core 1.8 Idempotent MERGE into cleaned tables
Serving Snowflake / BigQuery / Redshift Query-optimized silver and gold layers

Step 1 — Enable CDC on Postgres (or MySQL)

PostgreSQL

Postgres uses logical replication slots. You need wal_level = logical and a replication user.

-- postgresql.conf (requires restart)
-- wal_level = logical
-- max_replication_slots = 10   -- add one per Debezium connector
-- max_wal_senders = 10

-- Run as superuser
CREATE ROLE debezium_user REPLICATION LOGIN PASSWORD 'changeme_in_prod';

GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
  GRANT SELECT ON TABLES TO debezium_user;

-- Verify wal_level (requires superuser or pg_monitor)
SHOW wal_level;
-- Expected output:
-- wal_level
-- -----------
-- logical

Important: pgoutput (the native plugin, Postgres 10+) is preferred over decoderbufs. It requires no extension installation and reduces dependency surface.

MySQL

MySQL uses the binary log in row format. You also need GTID mode enabled for reliable Debezium operation:

# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
server-id         = 1
log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 7
gtid_mode         = ON
enforce_gtid_consistency = ON
CREATE USER 'debezium'@'%' IDENTIFIED BY 'changeme_in_prod';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

GTID mode is not optional for MySQL in production. Without it, a Debezium connector restart after a source failover will either miss events or duplicate them, depending on binlog position drift.


Step 2 — Configure Debezium 2.6 + Kafka Connect

Deploy Kafka Connect with the Debezium PostgreSQL connector plugin. The connector runs as a distributed Kafka Connect worker.

{
  "name": "pg-orders-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "pg-primary.internal",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "${file:/opt/kafka/secrets/db.properties:db.password}",
    "database.dbname": "orders_db",
    "database.server.name": "orders_pg",
    "table.include.list": "public.orders,public.order_items,public.customers",
    "slot.name": "debezium_orders",
    "publication.name": "debezium_pub",
    "heartbeat.interval.ms": "5000",
    "snapshot.mode": "initial",
    "snapshot.isolation.mode": "repeatable_read",
    "tombstones.on.delete": "true",
    "topic.prefix": "cdc",
    "decimal.handling.mode": "double",
    "time.precision.mode": "connect",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}

Register it via the Kafka Connect REST API:

curl -X POST \
  -H "Content-Type: application/json" \
  --data @pg-orders-cdc.json \
  http://kafka-connect:8083/connectors

# Expected output (connector enters RUNNING state within ~10 s):
# {
#   "name": "pg-orders-cdc",
#   "config": { ... },
#   "tasks": [{"connector": "pg-orders-cdc", "task": 0}],
#   "type": "source"
# }

# Verify connector status
curl http://kafka-connect:8083/connectors/pg-orders-cdc/status
# "state": "RUNNING"

Key config decisions explained:

If managing Kafka Connect workers sounds like a second job, Flowbasin ships pre-built CDC connectors for PostgreSQL, MySQL, and 50+ other sources — no connector JSON to maintain, no JVM heap tuning.


Step 3 — Land Raw Events in the Bronze Layer

The sink connector writes raw CDC events to object storage or a staging table. Use a Confluent S3 Sink or Snowflake Kafka Connector for the landing zone.

{
  "name": "s3-bronze-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics.regex": "cdc\\.orders_pg\\..*",
    "s3.region": "us-east-1",
    "s3.bucket.name": "databasin-bronze",
    "s3.part.size": "67108864",
    "flush.size": "10000",
    "rotate.interval.ms": "60000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",
    "schema.compatibility": "NONE",
    "locale": "en_US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "ts_ms",
    "topics.dir": "raw/cdc",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

The resulting S3 path structure looks like:

s3://databasin-bronze/raw/cdc/
  cdc.orders_pg.public.orders/
    year=2026/month=04/day=19/hour=14/
      orders+0+0000000000.parquet
      orders+0+0000010000.parquet

Each Parquet file contains raw CDC event rows with __op, __ts_ms, __source_ts_ms, and all source columns. The bronze layer is append-only and immutable — you never modify it. It's your replay point if a downstream transform fails. Once events land here, Lakebasin can query the raw Parquet directly via Trino before you've even written the dbt model.


Step 4 — Apply Changes with dbt Incremental Models

This is where most tutorials wave their hands. Here is the complete, idempotent MERGE pattern.

models/silver/orders.sql (dbt-core 1.8, dbt-snowflake 1.8):

-- models/silver/orders.sql
{{
  config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge',
    on_schema_change='append_new_columns',
    merge_exclude_columns=['created_at'],
    post_hook=[
      "DELETE FROM {{ this }} WHERE _cdc_deleted = true AND _cdc_deleted_at < DATEADD(day, -7, CURRENT_TIMESTAMP)"
    ]
  )
}}

WITH raw_events AS (
  SELECT
    order_id::INTEGER                          AS order_id,
    customer_id::INTEGER                       AS customer_id,
    status::VARCHAR                            AS status,
    total_amount::NUMERIC(12,2)                AS total_amount,
    TO_TIMESTAMP_NTZ(__source_ts_ms / 1000)   AS event_at,
    TO_TIMESTAMP_NTZ(__ts_ms / 1000)          AS captured_at,
    __op                                       AS _cdc_op,
    (__op = 'd')::BOOLEAN                      AS _cdc_deleted,
    CASE WHEN __op = 'd'
         THEN TO_TIMESTAMP_NTZ(__ts_ms / 1000)
         ELSE NULL END                         AS _cdc_deleted_at,
    ROW_NUMBER() OVER (
      PARTITION BY order_id
      ORDER BY __source_ts_ms DESC, __ts_ms DESC
    )                                          AS _row_rank
  FROM {{ source('bronze', 'cdc_orders_pg_public_orders') }}

  {% if is_incremental() %}
  WHERE TO_TIMESTAMP_NTZ(__ts_ms / 1000) > (
    SELECT COALESCE(MAX(captured_at), '1970-01-01') FROM {{ this }}
  )
  {% endif %}
)

SELECT
  order_id,
  customer_id,
  status,
  total_amount,
  event_at,
  captured_at,
  _cdc_op,
  _cdc_deleted,
  _cdc_deleted_at
FROM raw_events
WHERE _row_rank = 1

Why this model is idempotent:

  1. ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY __source_ts_ms DESC) — if the same order_id appears twice in the micro-batch (e.g., two updates arrived before dbt ran), only the latest event wins.
  2. unique_key='order_id' with incremental_strategy='merge' — Snowflake/BigQuery/Redshift will MERGE by order_id, not blindly INSERT.
  3. The {% if is_incremental() %} filter reads only new bronze rows since the last successful run. If the job fails and re-runs, it re-processes from the same watermark.
  4. Soft-delete pattern: rows with _cdc_deleted = true are retained for 7 days (for downstream audit), then physically removed by the post_hook.

This is at-least-once delivery with idempotent application, not true exactly-once. Kafka guarantees at-least-once by default. If a connector restarts mid-batch, you may receive duplicate events — the ROW_NUMBER dedup absorbs them.


The Three Production Gotchas Nobody Warns You About

1. Replication slot lag will tank your source database

A Postgres replication slot holds WAL segments until the slot consumer (Debezium) confirms they've been read. If the Kafka Connect worker goes down for 6 hours and no one is watching, the slot accumulates WAL. At sufficient backlog, Postgres will invalidate the replication slot to prevent disk exhaustion — and Debezium will fail with ERROR: replication slot "debezium_orders" does not exist on reconnect.

Fix: Set max_slot_wal_keep_size = 1GB in postgresql.conf (Postgres 13+) and alert when pg_replication_slots.confirmed_flush_lsn falls more than 5 minutes behind pg_current_wal_lsn().

-- Monitor slot lag
SELECT
  slot_name,
  active,
  pg_size_pretty(
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)
  ) AS lag_bytes
FROM pg_replication_slots
WHERE slot_type = 'logical';

2. Schema changes require a coordinated dance

When you add a column to the source table, Debezium continues streaming — but the new column appears only in events after the DDL. Old events in the bronze layer don't have that column. Your dbt model uses on_schema_change='append_new_columns', which handles the warehouse side. But if you backfill the bronze layer from WAL replay, the missing column in pre-DDL events will cause nulls, not failures — which is silent and wrong.

Fix: Treat every source DDL as a deployment event. Before applying the migration, pause the connector (PUT /connectors/{name}/pause), apply the DDL, unpause. Log the LSN at pause/unpause time in your schema registry or a _cdc_schema_versions metadata table.

3. Initial snapshot and live stream overlap

snapshot.mode: initial runs a consistent SELECT * snapshot inside a transaction, then switches to streaming. There is a brief overlap window where an event committed after the snapshot transaction started but before the slot LSN catches up may appear twice: once in the snapshot rows and once in the stream.

Fix: The dbt unique_key MERGE absorbs this. However, you must confirm the snapshot.isolation.mode is repeatable_read (the default). If you override it to read_committed for speed on large tables, you lose the overlap guarantee and can get phantom duplicates that the MERGE won't catch because both the snapshot row and the stream row carry identical __source_ts_ms values.


Monitoring That Actually Pages You

Four signals cover 90% of CDC pipeline failures:

Signal What it measures Alert threshold
Replication lag (bytes) pg_wal_lsn_diff between current WAL and slot LSN > 500 MB
Consumer group lag (Kafka) Messages in topic minus committed offset per consumer group > 10,000 messages OR > 5 min
Row count drift COUNT(*) in silver table vs. source table (sampled hourly) > 0.5% divergence
dbt run freshness Time since last successful dbt job for each model > 90 seconds past schedule
# Kafka consumer lag — check with kafka-consumer-groups.sh (Kafka 3.6+)
kafka-consumer-groups.sh \
  --bootstrap-server kafka:9092 \
  --describe \
  --group debezium-sink-group \
  --timeout 5000

# Expected output:
# GROUP                TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# debezium-sink-group  cdc.orders_pg.public.orders    0          48291730        48291742        12
-- Row-count drift check (run hourly, alert if abs(drift_pct) > 0.5)
WITH source_count AS (
  -- Replace with your actual source query via Flowbasin or dblink
  SELECT 412853 AS row_count  -- sampled from source
),
warehouse_count AS (
  SELECT COUNT(*) AS row_count FROM silver.orders WHERE NOT _cdc_deleted
)
SELECT
  s.row_count                                          AS source_rows,
  w.row_count                                          AS warehouse_rows,
  ROUND(100.0 * (w.row_count - s.row_count) / NULLIF(s.row_count, 0), 3) AS drift_pct
FROM source_count s, warehouse_count w;

Flowbasin exposes all four of these signals natively in its pipeline dashboard — replication slot lag, Kafka consumer offset, row-count drift, and model freshness — so you're not stitching together Prometheus exporters and Airflow SLAs by hand.


Managed vs. Self-Managed: Honest Trade-offs

Self-managed (Debezium + Kafka) Managed CDC (Flowbasin, Airbyte, Fivetran)
Latency Sub-second achievable 15 s – 5 min (varies by vendor)
Cost at scale Infrastructure cost only; no per-row fees Per-row or per-connector pricing gets expensive fast above 1B rows/day
Operational burden High — Kafka cluster, Connect workers, JVM tuning, connector upgrades Low — vendor manages it
Flexibility Full control over schema mapping, transforms, routing Constrained by what the vendor exposes
Delete handling Full soft/hard delete support via tombstones Varies; some vendors don't propagate hard deletes
Schema change handling Manual coordination (see Gotcha #2) Automated schema evolution, but sometimes silently drops columns

The self-managed path makes sense when you need sub-5-second latency, you already operate Kafka for other workloads, or you have uncommon transform requirements that no connector vendor supports.

The managed path makes sense when you want to stop being a Kafka operator and focus on the transformations that actually add business value.

The DataBasin platform sits in the middle of this matrix — managed CDC connectors with full event-stream visibility and a schema registry, without per-row pricing at scale. You get operational simplicity without signing up for an invoice that scales linearly with your table growth.

The honest downside of any managed CDC tool: you give up the ability to write custom Kafka Streams transforms inline. If you need complex multi-topic joins or stateful enrichment at ingestion time, self-managed Kafka Streams is the only path.

Iceberg vs. Delta Lake for the bronze layer

If you're landing events into an open table format rather than raw Parquet:

Don't choose based on GitHub stars or recent announcements. Choose based on your query engine.


Putting It Together — End-to-End Checklist

  1. Set wal_level = logical (Postgres) or gtid_mode = ON (MySQL) and restart the source database.
  2. Create a dedicated replication user with minimum required grants — no superuser.
  3. Set max_slot_wal_keep_size = 1GB (Postgres 13+) to prevent unbounded WAL accumulation.
  4. Deploy Debezium 2.6 via Kafka Connect with snapshot.mode: initial and tombstones.on.delete: true.
  5. Verify connector reaches RUNNING state and snapshot completes before pointing any downstream jobs at the topic.
  6. Configure the sink connector with rotate.interval.ms: 60000 so bronze files close within 1 minute.
  7. Add _row_rank = 1 dedup and __op-based soft delete logic to every dbt incremental model before going to production.
  8. Set unique_key and incremental_strategy='merge' on all CDC-sourced dbt models — never use append for CDC.
  9. Implement the four monitoring signals (slot lag, consumer lag, row-count drift, model freshness) with PagerDuty or equivalent alerting.
  10. Document and test the DDL migration runbook: pause connector → apply DDL → verify schema registry → unpause connector.
  11. Run a chaos test: kill the Kafka Connect worker mid-snapshot, restart it, confirm no duplicate rows in the silver table after the next dbt run.
  12. Set a 90-day review to audit replication slot health, Kafka topic retention, and bronze storage costs — CDC generates high-volume append-only data and the bill creeps up.

FAQ

What is the difference between CDC and incremental load?

Incremental load queries the source table for rows modified since a watermark timestamp (e.g., WHERE updated_at > :last_run). CDC reads the database transaction log and captures every change event — insert, update, delete — without querying the table at all. The practical difference: incremental loads miss hard deletes, require an indexed updated_at column on every table, and still impose read load on the source. CDC adds operational complexity but eliminates all three problems.

How do you handle hard deletes in an incremental ELT pipeline?

With log-based CDC, a DELETE on the source emits a tombstone event in Kafka (with __op = 'd'). The ExtractNewRecordState SMT rewrites this into a regular record with a __deleted: true field. In the dbt model, you MERGE this record into the silver table as a soft delete (_cdc_deleted = true). Downstream gold models filter WHERE NOT _cdc_deleted. After a retention window (7 days in the example above), a post_hook physically removes the soft-deleted rows. This is the only reliable way to propagate hard deletes — watermark-based incremental loads cannot detect them.

Is log-based CDC always necessary for production?

No. If your tables have reliable updated_at timestamps, hard deletes are rare or don't matter for your use case, and your source database can absorb the read load, watermark-based incremental loads are simpler to operate. Log-based CDC is the right choice when: (a) you need to capture deletes, (b) you need sub-minute latency, (c) your tables are large enough that SELECT queries during peak hours cause contention, or (d) you have tables without updated_at columns. The operational cost of Kafka + Debezium is non-trivial — don't pay it when you don't need to.

How do you handle schema changes in a CDC pipeline?

Schema changes are the most operationally disruptive event in a CDC pipeline. The recommended pattern: (1) Use on_schema_change='append_new_columns' in dbt so new source columns don't break the model. (2) Integrate Confluent Schema Registry or Apicurio with Debezium's Avro converter so incompatible schema changes are rejected at the connector level rather than silently corrupting downstream tables. (3) Treat any column rename or type change as a breaking change requiring a new topic, not an in-place migration. (4) For column drops on the source, add transforms.unwrap.add.fields to include schema version metadata so you can reconcile the bronze layer. Flowbasin handles schema evolution automatically for managed connectors; Reportbasin surfaces schema drift alerts so your BI layer doesn't silently start returning nulls.


Conclusion

You now have a complete blueprint for a production-grade CDC incremental ELT pipeline: WAL-based capture with Debezium 2.6, a bronze landing zone, idempotent MERGE models in dbt 1.8, and four monitoring signals that give you real coverage without alert fatigue. The hardest parts aren't the code — they're the replication slot hygiene, the DDL migration discipline, and the chaos testing you need to do before your first production incident does it for you.

If you want to run this architecture without managing Kafka Connect clusters yourself, try DataBasin freeFlowbasin handles the CDC connectors, Lakebasin runs the SQL analytics layer, and Reportbasin turns the output into governed AI-powered reports. The 12-item checklist above still applies — but items 1–6 become a UI workflow rather than a YAML file.