Last Updated: 3/19/2026
NATS and Pub/Sub Connectors
Feldera supports ingesting data from NATS messaging systems and Google Cloud Pub/Sub. These connectors enable integration with cloud-native messaging platforms.
NATS Input Connector
NATS is a lightweight, high-performance messaging system. The NATS input connector subscribes to NATS subjects and ingests messages.
Basic Configuration
inputs:
events:
stream: events
connector_config:
transport:
name: nats_input
config:
server: "nats://localhost:4222"
subject: "events.>"
format:
name: jsonConfiguration Options
Connection:
serverβ NATS server URL (e.g.,"nats://localhost:4222")subjectβ Subject pattern to subscribe to (supports wildcards)
Authentication:
username/passwordβ Basic authenticationtokenβ Token-based authenticationcredentials_fileβ Path to NATS credentials file
Subscription:
queue_groupβ Queue group name for load balancingdurable_nameβ Durable subscription name (JetStream)
Wildcard Subscriptions
NATS supports wildcard subjects:
transport:
name: nats_input
config:
server: "nats://localhost:4222"
subject: "events.*" # Match events.user, events.order, etc.Wildcard types:
*β Matches a single token (e.g.,events.*matchesevents.user)>β Matches one or more tokens (e.g.,events.>matchesevents.user.login)
Queue Groups
Use queue groups for load balancing across multiple consumers:
transport:
name: nats_input
config:
server: "nats://localhost:4222"
subject: "events"
queue_group: "processors"JetStream Support
For durable subscriptions with JetStream:
transport:
name: nats_input
config:
server: "nats://localhost:4222"
subject: "events"
durable_name: "event-processor"Google Cloud Pub/Sub Input Connector
Google Cloud Pub/Sub is a fully managed messaging service. The Pub/Sub input connector subscribes to topics and ingests messages.
Basic Configuration
inputs:
events:
stream: events
connector_config:
transport:
name: pubsub_input
config:
project_id: "my-project"
subscription_id: "my-subscription"
format:
name: jsonConfiguration Options
Project and Subscription:
project_idβ GCP project IDsubscription_idβ Pub/Sub subscription ID
Authentication:
credentials_fileβ Path to service account JSON file- Uses Application Default Credentials if not specified
Performance:
max_outstanding_messagesβ Maximum messages to buffer (default: 1000)max_outstanding_bytesβ Maximum bytes to buffer (default: 100MB)
Creating Subscriptions
Create a Pub/Sub subscription before using the connector:
gcloud pubsub subscriptions create my-subscription \
--topic=my-topic \
--project=my-projectAuthentication
Using service account:
transport:
name: pubsub_input
config:
project_id: "my-project"
subscription_id: "my-subscription"
credentials_file: "/path/to/service-account.json"Using Application Default Credentials:
transport:
name: pubsub_input
config:
project_id: "my-project"
subscription_id: "my-subscription"Set GOOGLE_APPLICATION_CREDENTIALS environment variable to the service account file path.
Message Attributes
Pub/Sub message attributes are available in the data:
{
"data": {"user_id": 1, "action": "login"},
"attributes": {
"source": "web",
"timestamp": "2024-01-15T10:00:00Z"
}
}Use Cases
Event-Driven Architectures
Ingest events from microservices:
inputs:
user_events:
stream: events
connector_config:
transport:
name: nats_input
config:
server: "nats://nats:4222"
subject: "user.>"
format:
name: jsonCloud-Native Applications
Integrate with GCP services:
inputs:
logs:
stream: logs
connector_config:
transport:
name: pubsub_input
config:
project_id: "my-project"
subscription_id: "log-processor"
format:
name: jsonReal-Time Analytics
Process streaming data from NATS:
CREATE TABLE events (
event_id BIGINT,
event_time TIMESTAMP,
user_id BIGINT,
action VARCHAR
);
CREATE MATERIALIZED VIEW event_counts AS
SELECT
action,
COUNT(*) as count
FROM events
GROUP BY action;Performance Considerations
Batching
Both connectors batch messages for efficiency. Control batch size:
connector_config:
max_batch_size: 10000
max_queued_records: 1000000Parallelism
Increase pipeline workers for higher throughput:
{
"workers": 16
}Backpressure
Configure backpressure thresholds:
connector_config:
max_queued_records: 1000000Whatβs Next
- Kafka Connector: Learn about Kafka input
- Connector Orchestration: Control connector startup
- Pipeline Configuration: Optimize performance