Internal site. Jolli authentication required to view.
Skip to Content
🔌 ConnectorsDebezium Connector

Last Updated: 3/19/2026


PostgreSQL/Debezium Connector

The Debezium connector enables Feldera to consume change data capture (CDC) streams from PostgreSQL databases. This connector reads from Kafka topics populated by Debezium, allowing you to process database changes in real-time.

Overview

Debezium is a distributed platform for change data capture that monitors database transaction logs and publishes change events to Kafka. Feldera consumes these events through the Kafka connector with Debezium-specific format handling.

The typical architecture:

PostgreSQL → Debezium → Kafka → Feldera

Prerequisites

Before using the Debezium connector, you need:

  1. PostgreSQL database with logical replication enabled
  2. Debezium PostgreSQL connector deployed and configured
  3. Kafka cluster receiving Debezium change events
  4. Feldera pipeline with tables matching your database schema

Enabling Logical Replication in PostgreSQL

Edit postgresql.conf:

wal_level = logical max_replication_slots = 4 max_wal_senders = 4

Restart PostgreSQL for changes to take effect.

Deploying Debezium

Deploy the Debezium PostgreSQL connector using Kafka Connect:

{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "debezium", "database.password": "secret", "database.dbname": "mydb", "database.server.name": "dbserver1", "table.include.list": "public.users,public.orders", "plugin.name": "pgoutput" } }

This creates Kafka topics like dbserver1.public.users and dbserver1.public.orders containing change events.

Connector Configuration

Configure a Kafka input connector with Debezium format to consume CDC events:

inputs: users: stream: users connector_config: transport: name: kafka_input config: bootstrap.servers: "localhost:9092" topics: - "dbserver1.public.users" auto.offset.reset: "earliest" format: name: json config: update_format: "debezium"

Key Configuration Options

Transport (Kafka):

  • bootstrap.servers — Kafka broker addresses
  • topics — List of Debezium topics to consume
  • auto.offset.reset — Where to start reading: "earliest" or "latest"
  • group.id — Consumer group ID (optional, auto-generated if not specified)

Format:

  • name: json — Debezium uses JSON format
  • update_format: "debezium" — Enables Debezium-specific parsing

Debezium Message Format

Debezium produces messages with a specific structure:

Insert Event

{ "before": null, "after": { "id": 1, "name": "Alice", "email": "alice@example.com" }, "op": "c", "ts_ms": 1678901234567 }

Update Event

{ "before": { "id": 1, "name": "Alice", "email": "alice@example.com" }, "after": { "id": 1, "name": "Alice", "email": "alice@newdomain.com" }, "op": "u", "ts_ms": 1678901234568 }

Delete Event

{ "before": { "id": 1, "name": "Alice", "email": "alice@newdomain.com" }, "after": null, "op": "d", "ts_ms": 1678901234569 }

Feldera automatically interprets these operations:

  • op: "c" (create) → Insert
  • op: "u" (update) → Delete old record + Insert new record
  • op: "d" (delete) → Delete
  • op: "r" (read/snapshot) → Insert

Schema Mapping

Define Feldera tables that match your PostgreSQL schema:

PostgreSQL table:

CREATE TABLE users ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(100) UNIQUE NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

Feldera table:

CREATE TABLE users ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(100) NOT NULL, created_at TIMESTAMP );

Data Type Mapping

Common PostgreSQL to Feldera type mappings:

PostgreSQLFeldera
SERIAL, INTEGERINTEGER
BIGSERIAL, BIGINTBIGINT
REALREAL
DOUBLE PRECISIONDOUBLE
NUMERIC(p,s)DECIMAL(p,s)
VARCHAR(n), TEXTVARCHAR(n)
TIMESTAMPTIMESTAMP
DATEDATE
BOOLEANBOOLEAN
JSON, JSONBVARCHAR (as JSON string)

Handling Initial Snapshots

When Debezium starts, it performs an initial snapshot of existing data. These snapshot records have op: "r" and are treated as inserts by Feldera.

To ensure the pipeline starts from the beginning of the snapshot:

transport: name: kafka_input config: bootstrap.servers: "localhost:9092" topics: - "dbserver1.public.users" auto.offset.reset: "earliest" # Start from beginning

Multiple Tables

Process multiple tables by configuring multiple input connectors:

inputs: users: stream: users connector_config: transport: name: kafka_input config: bootstrap.servers: "localhost:9092" topics: - "dbserver1.public.users" format: name: json config: update_format: "debezium" orders: stream: orders connector_config: transport: name: kafka_input config: bootstrap.servers: "localhost:9092" topics: - "dbserver1.public.orders" format: name: json config: update_format: "debezium"

Joins and Aggregations

Once CDC data flows into Feldera, you can perform joins and aggregations:

-- Join users and orders CREATE VIEW user_orders AS SELECT u.name, u.email, o.order_id, o.amount, o.created_at FROM users u JOIN orders o ON u.id = o.user_id; -- Aggregate order totals by user CREATE VIEW user_totals AS SELECT u.name, COUNT(*) as order_count, SUM(o.amount) as total_amount FROM users u JOIN orders o ON u.id = o.user_id GROUP BY u.name;

These views update incrementally as CDC events arrive.

Error Handling

Schema Mismatches

If the Debezium message schema doesn’t match your Feldera table, parsing errors occur. Check pipeline logs:

for log_line in pipeline.logs(): print(log_line)

Common issues:

  • Missing columns in Feldera table
  • Type mismatches between PostgreSQL and Feldera
  • NULL values in NOT NULL columns

Kafka Connection Issues

If the Kafka connector cannot connect:

stats = pipeline.input_connector_stats("users", "users") print(stats.status) # Check for errors

Verify:

  • Kafka broker addresses are correct
  • Topics exist and are accessible
  • Network connectivity between Feldera and Kafka

Performance Considerations

Batching

Kafka input batches messages for efficiency. Control batch size:

connector_config: max_batch_size: 10000 max_queued_records: 1000000

Parallelism

For high-throughput CDC streams, increase pipeline workers:

{ "workers": 16 }

Kafka Consumer Configuration

Tune Kafka consumer settings:

transport: name: kafka_input config: bootstrap.servers: "localhost:9092" topics: - "dbserver1.public.users" fetch.min.bytes: "1048576" fetch.max.wait.ms: "500"

Complete Example

Here’s a complete pipeline configuration for PostgreSQL CDC:

-- Define tables matching PostgreSQL schema CREATE TABLE users ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL, email VARCHAR(100) NOT NULL, created_at TIMESTAMP ); CREATE TABLE orders ( order_id INTEGER NOT NULL PRIMARY KEY, user_id INTEGER NOT NULL, amount DECIMAL(10, 2) NOT NULL, created_at TIMESTAMP ); -- Create a view for real-time analytics CREATE VIEW user_order_summary AS SELECT u.id, u.name, COUNT(*) as order_count, SUM(o.amount) as total_spent, MAX(o.created_at) as last_order_date FROM users u LEFT JOIN orders o ON u.id = o.user_id GROUP BY u.id, u.name;

Connector configuration:

inputs: users: stream: users connector_config: transport: name: kafka_input config: bootstrap.servers: "kafka:9092" topics: - "dbserver1.public.users" auto.offset.reset: "earliest" format: name: json config: update_format: "debezium" orders: stream: orders connector_config: transport: name: kafka_input config: bootstrap.servers: "kafka:9092" topics: - "dbserver1.public.orders" auto.offset.reset: "earliest" format: name: json config: update_format: "debezium"

What’s Next