Last Updated: 3/19/2026
Kafka Connector
The Kafka connector enables Feldera pipelines to read from and write to Apache Kafka topics. It supports both input (consuming messages) and output (producing messages) operations, with full support for authentication, fault tolerance, and advanced configuration options.
Overview
Kafka is one of the most commonly used connectors in Feldera. The connector uses the librdkafka library under the hood and exposes most of its configuration options directly to users. This allows fine-grained control over consumer and producer behavior while maintaining sensible defaults for common use cases.
Input Configuration
To configure Kafka as an input source, you specify the topic to read from and connection parameters. The connector can start reading from different positions in the topic and supports parallel consumption from multiple partitions.
Basic Input Example
{
"transport": {
"name": "kafka_input",
"config": {
"bootstrap.servers": "localhost:9092",
"topic": "my-input-topic",
"auto.offset.reset": "earliest"
}
}
}Bootstrap Servers
The bootstrap.servers option specifies the initial Kafka brokers to connect to. If not provided, the connector defaults to localhost or the value of the REDPANDA_BROKERS environment variable. You can specify multiple brokers as a comma-separated list:
{
"bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092"
}Topic Selection
The topic field specifies which Kafka topic to consume from. This is a required field for input connectors.
Starting Position
The start_from option controls where the connector begins reading:
earliest: Start from the beginning of the topiclatest: Start from the current end (default)offsets: Start from specific offsets per partitiontimestamp: Start from a particular timestamp (milliseconds since epoch)
Example with specific offsets:
{
"topic": "my-topic",
"start_from": {
"offsets": [100, 200, 300]
},
"partitions": [0, 1, 2]
}Partition Selection
By default, the connector consumes from all partitions in the topic. You can limit consumption to specific partitions using the partitions field:
{
"topic": "my-topic",
"partitions": [0, 2, 4]
}Consumer Groups
The Kafka connector does not use consumer groups for offset management. Instead, it generates a unique group.id for each connector instance. The connector manages offsets internally, especially when fault tolerance is enabled.
Security and Authentication
The connector supports all librdkafka security mechanisms through the kafka_options field. Common configurations include:
SASL/PLAIN Authentication:
{
"bootstrap.servers": "broker:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "my-username",
"sasl.password": "my-password"
}SSL/TLS:
{
"security.protocol": "SSL",
"ssl.ca.location": "/path/to/ca-cert",
"ssl.certificate.location": "/path/to/client-cert",
"ssl.key.location": "/path/to/client-key"
}AWS MSK with IAM Authentication:
For AWS Managed Streaming for Kafka (MSK), use OAUTHBEARER with IAM:
{
"bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"region": "us-east-1"
}The connector automatically generates AWS MSK IAM tokens when sasl.mechanism is set to OAUTHBEARER. The region field is required for MSK connections.
Performance Tuning
Poller Threads:
The poller_threads option controls how many threads poll Kafka for messages. The default is 3, which works well for small messages. For large messages, setting this to 1 may improve performance:
{
"poller_threads": 1
}Partition Synchronization:
When using Feldera’s lateness feature with multiple partitions, enable synchronize_partitions to ensure events are processed in timestamp order across partitions:
{
"synchronize_partitions": true
}This prevents out-of-order processing but requires that all partitions have data and that Kafka timestamps are monotonically increasing.
Metadata Access
The connector can include Kafka metadata in each record, accessible via the CONNECTOR_METADATA() SQL function:
{
"include_headers": true,
"include_timestamp": true,
"include_partition": true,
"include_offset": true,
"include_topic": true
}Fault Tolerance
When a pipeline is configured with fault tolerance, the Kafka input connector automatically checkpoints its position. On restart after a failure, it resumes from the last checkpoint.
If the data at the checkpoint offset has expired from Kafka (due to retention policies), the connector will fail by default. Set resume_earliest_if_data_expires to true to start from the earliest available offset instead:
{
"resume_earliest_if_data_expires": true
}Output Configuration
The Kafka output connector writes pipeline results to a Kafka topic. It supports transactional writes for exactly-once semantics when fault tolerance is enabled.
Basic Output Example
{
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "localhost:9092",
"topic": "my-output-topic"
}
}
}Message Headers
You can attach custom headers to all messages produced by the connector:
{
"topic": "my-output-topic",
"headers": [
{
"key": "source",
"value": "feldera-pipeline"
},
{
"key": "version",
"value": [1, 0, 0]
}
]
}Header values can be strings or byte arrays.
Producer Configuration
All librdkafka producer options can be specified in kafka_options. The connector enforces certain settings for reliability:
acks: Set toallfor durabilityenable.idempotence: Set totrueto prevent duplicatesretries: Set to5for automatic retry on transient failures
Fault-Tolerant Output
When fault tolerance is enabled, the output connector uses Kafka transactions to ensure exactly-once delivery. Configure separate consumer and producer options if needed:
{
"topic": "my-output-topic",
"fault_tolerance": {
"consumer_options": {
"fetch.min.bytes": "1024"
},
"producer_options": {
"compression.type": "snappy"
}
}
}The connector records step numbers as message keys and uses them to discard duplicate output on recovery.
Initialization Timeout
The initialization_timeout_secs option (default: 60) controls how long the connector waits to connect to Kafka brokers during startup:
{
"initialization_timeout_secs": 120
}Advanced Options
Log Level
Control the verbosity of librdkafka logging:
{
"log_level": "info"
}Valid levels: emerg, alert, critical, error, warning, notice, info, debug.
SSL Certificate from PEM
The connector supports providing SSL certificates as PEM strings instead of file paths. Use ssl.certificate.pem instead of ssl.certificate.location:
{
"ssl.certificate.pem": "-----BEGIN CERTIFICATE-----\n..."
}The connector automatically saves the PEM content to a temporary file and configures librdkafka to use it.
All librdkafka Options
Any option documented in the librdkafka configuration reference can be passed through kafka_options. The connector validates that certain options are not overridden when they conflict with Feldera’s requirements.
Common Patterns
What’s Next
- Debezium Connector: Learn how to use Debezium with Feldera for change data capture from relational databases.
- Connectors Overview: Explore all available Feldera connectors and how to choose the right one for your pipeline.
Reading from Multiple Topics
Create separate input connectors for each topic, each with its own configuration.
Compacted Topics
For Kafka compacted topics, start from earliest to ensure you read all keys:
{
"topic": "compacted-topic",
"start_from": "earliest"
}High-Throughput Scenarios
For maximum throughput:
{
"poller_threads": 4,
"kafka_options": {
"fetch.min.bytes": "1048576",
"fetch.wait.max.ms": "500"
}
}Schema Registry Integration
While the Kafka connector itself doesn’t integrate with Schema Registry, you can use Feldera’s format options (Avro, JSON) alongside Kafka to handle schema-encoded data.