Internal site. Jolli authentication required to view.
Skip to Content
🔌 ConnectorsKafka Connector

Last Updated: 3/19/2026


Kafka Connector

The Kafka connector enables Feldera pipelines to read from and write to Apache Kafka topics. It supports both input (consuming messages) and output (producing messages) operations, with full support for authentication, fault tolerance, and advanced configuration options.

Overview

Kafka is one of the most commonly used connectors in Feldera. The connector uses the librdkafka library under the hood and exposes most of its configuration options directly to users. This allows fine-grained control over consumer and producer behavior while maintaining sensible defaults for common use cases.

Input Configuration

To configure Kafka as an input source, you specify the topic to read from and connection parameters. The connector can start reading from different positions in the topic and supports parallel consumption from multiple partitions.

Basic Input Example

{ "transport": { "name": "kafka_input", "config": { "bootstrap.servers": "localhost:9092", "topic": "my-input-topic", "auto.offset.reset": "earliest" } } }

Bootstrap Servers

The bootstrap.servers option specifies the initial Kafka brokers to connect to. If not provided, the connector defaults to localhost or the value of the REDPANDA_BROKERS environment variable. You can specify multiple brokers as a comma-separated list:

{ "bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092" }

Topic Selection

The topic field specifies which Kafka topic to consume from. This is a required field for input connectors.

Starting Position

The start_from option controls where the connector begins reading:

  • earliest: Start from the beginning of the topic
  • latest: Start from the current end (default)
  • offsets: Start from specific offsets per partition
  • timestamp: Start from a particular timestamp (milliseconds since epoch)

Example with specific offsets:

{ "topic": "my-topic", "start_from": { "offsets": [100, 200, 300] }, "partitions": [0, 1, 2] }

Partition Selection

By default, the connector consumes from all partitions in the topic. You can limit consumption to specific partitions using the partitions field:

{ "topic": "my-topic", "partitions": [0, 2, 4] }

Consumer Groups

The Kafka connector does not use consumer groups for offset management. Instead, it generates a unique group.id for each connector instance. The connector manages offsets internally, especially when fault tolerance is enabled.

Security and Authentication

The connector supports all librdkafka security mechanisms through the kafka_options field. Common configurations include:

SASL/PLAIN Authentication:

{ "bootstrap.servers": "broker:9092", "security.protocol": "SASL_SSL", "sasl.mechanism": "PLAIN", "sasl.username": "my-username", "sasl.password": "my-password" }

SSL/TLS:

{ "security.protocol": "SSL", "ssl.ca.location": "/path/to/ca-cert", "ssl.certificate.location": "/path/to/client-cert", "ssl.key.location": "/path/to/client-key" }

AWS MSK with IAM Authentication:

For AWS Managed Streaming for Kafka (MSK), use OAUTHBEARER with IAM:

{ "bootstrap.servers": "my-cluster.kafka.us-east-1.amazonaws.com:9098", "security.protocol": "SASL_SSL", "sasl.mechanism": "OAUTHBEARER", "region": "us-east-1" }

The connector automatically generates AWS MSK IAM tokens when sasl.mechanism is set to OAUTHBEARER. The region field is required for MSK connections.

Performance Tuning

Poller Threads:

The poller_threads option controls how many threads poll Kafka for messages. The default is 3, which works well for small messages. For large messages, setting this to 1 may improve performance:

{ "poller_threads": 1 }

Partition Synchronization:

When using Feldera’s lateness feature with multiple partitions, enable synchronize_partitions to ensure events are processed in timestamp order across partitions:

{ "synchronize_partitions": true }

This prevents out-of-order processing but requires that all partitions have data and that Kafka timestamps are monotonically increasing.

Metadata Access

The connector can include Kafka metadata in each record, accessible via the CONNECTOR_METADATA() SQL function:

{ "include_headers": true, "include_timestamp": true, "include_partition": true, "include_offset": true, "include_topic": true }

Fault Tolerance

When a pipeline is configured with fault tolerance, the Kafka input connector automatically checkpoints its position. On restart after a failure, it resumes from the last checkpoint.

If the data at the checkpoint offset has expired from Kafka (due to retention policies), the connector will fail by default. Set resume_earliest_if_data_expires to true to start from the earliest available offset instead:

{ "resume_earliest_if_data_expires": true }

Output Configuration

The Kafka output connector writes pipeline results to a Kafka topic. It supports transactional writes for exactly-once semantics when fault tolerance is enabled.

Basic Output Example

{ "transport": { "name": "kafka_output", "config": { "bootstrap.servers": "localhost:9092", "topic": "my-output-topic" } } }

Message Headers

You can attach custom headers to all messages produced by the connector:

{ "topic": "my-output-topic", "headers": [ { "key": "source", "value": "feldera-pipeline" }, { "key": "version", "value": [1, 0, 0] } ] }

Header values can be strings or byte arrays.

Producer Configuration

All librdkafka producer options can be specified in kafka_options. The connector enforces certain settings for reliability:

  • acks: Set to all for durability
  • enable.idempotence: Set to true to prevent duplicates
  • retries: Set to 5 for automatic retry on transient failures

Fault-Tolerant Output

When fault tolerance is enabled, the output connector uses Kafka transactions to ensure exactly-once delivery. Configure separate consumer and producer options if needed:

{ "topic": "my-output-topic", "fault_tolerance": { "consumer_options": { "fetch.min.bytes": "1024" }, "producer_options": { "compression.type": "snappy" } } }

The connector records step numbers as message keys and uses them to discard duplicate output on recovery.

Initialization Timeout

The initialization_timeout_secs option (default: 60) controls how long the connector waits to connect to Kafka brokers during startup:

{ "initialization_timeout_secs": 120 }

Advanced Options

Log Level

Control the verbosity of librdkafka logging:

{ "log_level": "info" }

Valid levels: emerg, alert, critical, error, warning, notice, info, debug.

SSL Certificate from PEM

The connector supports providing SSL certificates as PEM strings instead of file paths. Use ssl.certificate.pem instead of ssl.certificate.location:

{ "ssl.certificate.pem": "-----BEGIN CERTIFICATE-----\n..." }

The connector automatically saves the PEM content to a temporary file and configures librdkafka to use it.

All librdkafka Options

Any option documented in the librdkafka configuration reference  can be passed through kafka_options. The connector validates that certain options are not overridden when they conflict with Feldera’s requirements.

Common Patterns

What’s Next

  • Debezium Connector: Learn how to use Debezium with Feldera for change data capture from relational databases.
  • Connectors Overview: Explore all available Feldera connectors and how to choose the right one for your pipeline.

Reading from Multiple Topics

Create separate input connectors for each topic, each with its own configuration.

Compacted Topics

For Kafka compacted topics, start from earliest to ensure you read all keys:

{ "topic": "compacted-topic", "start_from": "earliest" }

High-Throughput Scenarios

For maximum throughput:

{ "poller_threads": 4, "kafka_options": { "fetch.min.bytes": "1048576", "fetch.wait.max.ms": "500" } }

Schema Registry Integration

While the Kafka connector itself doesn’t integrate with Schema Registry, you can use Feldera’s format options (Avro, JSON) alongside Kafka to handle schema-encoded data.