Last Updated: 3/19/2026
PostgreSQL/Debezium Connector
The Debezium connector enables Feldera to consume change data capture (CDC) streams from PostgreSQL databases. This connector reads from Kafka topics populated by Debezium, allowing you to process database changes in real-time.
Overview
Debezium is a distributed platform for change data capture that monitors database transaction logs and publishes change events to Kafka. Feldera consumes these events through the Kafka connector with Debezium-specific format handling.
The typical architecture:
PostgreSQL → Debezium → Kafka → FelderaPrerequisites
Before using the Debezium connector, you need:
- PostgreSQL database with logical replication enabled
- Debezium PostgreSQL connector deployed and configured
- Kafka cluster receiving Debezium change events
- Feldera pipeline with tables matching your database schema
Enabling Logical Replication in PostgreSQL
Edit postgresql.conf:
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4Restart PostgreSQL for changes to take effect.
Deploying Debezium
Deploy the Debezium PostgreSQL connector using Kafka Connect:
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "mydb",
"database.server.name": "dbserver1",
"table.include.list": "public.users,public.orders",
"plugin.name": "pgoutput"
}
}This creates Kafka topics like dbserver1.public.users and dbserver1.public.orders containing change events.
Connector Configuration
Configure a Kafka input connector with Debezium format to consume CDC events:
inputs:
users:
stream: users
connector_config:
transport:
name: kafka_input
config:
bootstrap.servers: "localhost:9092"
topics:
- "dbserver1.public.users"
auto.offset.reset: "earliest"
format:
name: json
config:
update_format: "debezium"Key Configuration Options
Transport (Kafka):
bootstrap.servers— Kafka broker addressestopics— List of Debezium topics to consumeauto.offset.reset— Where to start reading:"earliest"or"latest"group.id— Consumer group ID (optional, auto-generated if not specified)
Format:
name: json— Debezium uses JSON formatupdate_format: "debezium"— Enables Debezium-specific parsing
Debezium Message Format
Debezium produces messages with a specific structure:
Insert Event
{
"before": null,
"after": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
},
"op": "c",
"ts_ms": 1678901234567
}Update Event
{
"before": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
},
"after": {
"id": 1,
"name": "Alice",
"email": "alice@newdomain.com"
},
"op": "u",
"ts_ms": 1678901234568
}Delete Event
{
"before": {
"id": 1,
"name": "Alice",
"email": "alice@newdomain.com"
},
"after": null,
"op": "d",
"ts_ms": 1678901234569
}Feldera automatically interprets these operations:
op: "c"(create) → Insertop: "u"(update) → Delete old record + Insert new recordop: "d"(delete) → Deleteop: "r"(read/snapshot) → Insert
Schema Mapping
Define Feldera tables that match your PostgreSQL schema:
PostgreSQL table:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);Feldera table:
CREATE TABLE users (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP
);Data Type Mapping
Common PostgreSQL to Feldera type mappings:
| PostgreSQL | Feldera |
|---|---|
| SERIAL, INTEGER | INTEGER |
| BIGSERIAL, BIGINT | BIGINT |
| REAL | REAL |
| DOUBLE PRECISION | DOUBLE |
| NUMERIC(p,s) | DECIMAL(p,s) |
| VARCHAR(n), TEXT | VARCHAR(n) |
| TIMESTAMP | TIMESTAMP |
| DATE | DATE |
| BOOLEAN | BOOLEAN |
| JSON, JSONB | VARCHAR (as JSON string) |
Handling Initial Snapshots
When Debezium starts, it performs an initial snapshot of existing data. These snapshot records have op: "r" and are treated as inserts by Feldera.
To ensure the pipeline starts from the beginning of the snapshot:
transport:
name: kafka_input
config:
bootstrap.servers: "localhost:9092"
topics:
- "dbserver1.public.users"
auto.offset.reset: "earliest" # Start from beginningMultiple Tables
Process multiple tables by configuring multiple input connectors:
inputs:
users:
stream: users
connector_config:
transport:
name: kafka_input
config:
bootstrap.servers: "localhost:9092"
topics:
- "dbserver1.public.users"
format:
name: json
config:
update_format: "debezium"
orders:
stream: orders
connector_config:
transport:
name: kafka_input
config:
bootstrap.servers: "localhost:9092"
topics:
- "dbserver1.public.orders"
format:
name: json
config:
update_format: "debezium"Joins and Aggregations
Once CDC data flows into Feldera, you can perform joins and aggregations:
-- Join users and orders
CREATE VIEW user_orders AS
SELECT
u.name,
u.email,
o.order_id,
o.amount,
o.created_at
FROM users u
JOIN orders o ON u.id = o.user_id;
-- Aggregate order totals by user
CREATE VIEW user_totals AS
SELECT
u.name,
COUNT(*) as order_count,
SUM(o.amount) as total_amount
FROM users u
JOIN orders o ON u.id = o.user_id
GROUP BY u.name;These views update incrementally as CDC events arrive.
Error Handling
Schema Mismatches
If the Debezium message schema doesn’t match your Feldera table, parsing errors occur. Check pipeline logs:
for log_line in pipeline.logs():
print(log_line)Common issues:
- Missing columns in Feldera table
- Type mismatches between PostgreSQL and Feldera
- NULL values in NOT NULL columns
Kafka Connection Issues
If the Kafka connector cannot connect:
stats = pipeline.input_connector_stats("users", "users")
print(stats.status) # Check for errorsVerify:
- Kafka broker addresses are correct
- Topics exist and are accessible
- Network connectivity between Feldera and Kafka
Performance Considerations
Batching
Kafka input batches messages for efficiency. Control batch size:
connector_config:
max_batch_size: 10000
max_queued_records: 1000000Parallelism
For high-throughput CDC streams, increase pipeline workers:
{
"workers": 16
}Kafka Consumer Configuration
Tune Kafka consumer settings:
transport:
name: kafka_input
config:
bootstrap.servers: "localhost:9092"
topics:
- "dbserver1.public.users"
fetch.min.bytes: "1048576"
fetch.max.wait.ms: "500"Complete Example
Here’s a complete pipeline configuration for PostgreSQL CDC:
-- Define tables matching PostgreSQL schema
CREATE TABLE users (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP
);
CREATE TABLE orders (
order_id INTEGER NOT NULL PRIMARY KEY,
user_id INTEGER NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
created_at TIMESTAMP
);
-- Create a view for real-time analytics
CREATE VIEW user_order_summary AS
SELECT
u.id,
u.name,
COUNT(*) as order_count,
SUM(o.amount) as total_spent,
MAX(o.created_at) as last_order_date
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id, u.name;Connector configuration:
inputs:
users:
stream: users
connector_config:
transport:
name: kafka_input
config:
bootstrap.servers: "kafka:9092"
topics:
- "dbserver1.public.users"
auto.offset.reset: "earliest"
format:
name: json
config:
update_format: "debezium"
orders:
stream: orders
connector_config:
transport:
name: kafka_input
config:
bootstrap.servers: "kafka:9092"
topics:
- "dbserver1.public.orders"
auto.offset.reset: "earliest"
format:
name: json
config:
update_format: "debezium"What’s Next
- Kafka Connector: Learn more about Kafka connector configuration
- Cloud Storage Connectors (S3/Delta Lake/Iceberg): Explore other data source options
- Connector Orchestration: Control connector startup order