Internal site. Jolli authentication required to view.
Skip to Content
πŸ”Œ ConnectorsNats Pubsub Connectors

Last Updated: 3/19/2026


NATS and Pub/Sub Connectors

Feldera supports ingesting data from NATS messaging systems and Google Cloud Pub/Sub. These connectors enable integration with cloud-native messaging platforms.

NATS Input Connector

NATS is a lightweight, high-performance messaging system. The NATS input connector subscribes to NATS subjects and ingests messages.

Basic Configuration

inputs: events: stream: events connector_config: transport: name: nats_input config: server: "nats://localhost:4222" subject: "events.>" format: name: json

Configuration Options

Connection:

  • server β€” NATS server URL (e.g., "nats://localhost:4222")
  • subject β€” Subject pattern to subscribe to (supports wildcards)

Authentication:

  • username / password β€” Basic authentication
  • token β€” Token-based authentication
  • credentials_file β€” Path to NATS credentials file

Subscription:

  • queue_group β€” Queue group name for load balancing
  • durable_name β€” Durable subscription name (JetStream)

Wildcard Subscriptions

NATS supports wildcard subjects:

transport: name: nats_input config: server: "nats://localhost:4222" subject: "events.*" # Match events.user, events.order, etc.

Wildcard types:

  • * β€” Matches a single token (e.g., events.* matches events.user)
  • > β€” Matches one or more tokens (e.g., events.> matches events.user.login)

Queue Groups

Use queue groups for load balancing across multiple consumers:

transport: name: nats_input config: server: "nats://localhost:4222" subject: "events" queue_group: "processors"

JetStream Support

For durable subscriptions with JetStream:

transport: name: nats_input config: server: "nats://localhost:4222" subject: "events" durable_name: "event-processor"

Google Cloud Pub/Sub Input Connector

Google Cloud Pub/Sub is a fully managed messaging service. The Pub/Sub input connector subscribes to topics and ingests messages.

Basic Configuration

inputs: events: stream: events connector_config: transport: name: pubsub_input config: project_id: "my-project" subscription_id: "my-subscription" format: name: json

Configuration Options

Project and Subscription:

  • project_id β€” GCP project ID
  • subscription_id β€” Pub/Sub subscription ID

Authentication:

  • credentials_file β€” Path to service account JSON file
  • Uses Application Default Credentials if not specified

Performance:

  • max_outstanding_messages β€” Maximum messages to buffer (default: 1000)
  • max_outstanding_bytes β€” Maximum bytes to buffer (default: 100MB)

Creating Subscriptions

Create a Pub/Sub subscription before using the connector:

gcloud pubsub subscriptions create my-subscription \ --topic=my-topic \ --project=my-project

Authentication

Using service account:

transport: name: pubsub_input config: project_id: "my-project" subscription_id: "my-subscription" credentials_file: "/path/to/service-account.json"

Using Application Default Credentials:

transport: name: pubsub_input config: project_id: "my-project" subscription_id: "my-subscription"

Set GOOGLE_APPLICATION_CREDENTIALS environment variable to the service account file path.

Message Attributes

Pub/Sub message attributes are available in the data:

{ "data": {"user_id": 1, "action": "login"}, "attributes": { "source": "web", "timestamp": "2024-01-15T10:00:00Z" } }

Use Cases

Event-Driven Architectures

Ingest events from microservices:

inputs: user_events: stream: events connector_config: transport: name: nats_input config: server: "nats://nats:4222" subject: "user.>" format: name: json

Cloud-Native Applications

Integrate with GCP services:

inputs: logs: stream: logs connector_config: transport: name: pubsub_input config: project_id: "my-project" subscription_id: "log-processor" format: name: json

Real-Time Analytics

Process streaming data from NATS:

CREATE TABLE events ( event_id BIGINT, event_time TIMESTAMP, user_id BIGINT, action VARCHAR ); CREATE MATERIALIZED VIEW event_counts AS SELECT action, COUNT(*) as count FROM events GROUP BY action;

Performance Considerations

Batching

Both connectors batch messages for efficiency. Control batch size:

connector_config: max_batch_size: 10000 max_queued_records: 1000000

Parallelism

Increase pipeline workers for higher throughput:

{ "workers": 16 }

Backpressure

Configure backpressure thresholds:

connector_config: max_queued_records: 1000000

What’s Next