Internal site. Jolli authentication required to view.
Skip to Content
๐Ÿ”Œ ConnectorsConnector Orchestration

Last Updated: 3/19/2026


Connector Orchestration

Feldera provides connector orchestration features to control the startup order and dependencies between connectors. This enables building ordered ingestion pipelines and coordinating data flow from multiple sources.

Connector Labels

Assign labels to connectors to group them:

inputs: users: stream: users connector_config: labels: ["dimension", "batch1"] transport: name: s3_input config: bucket_name: "my-bucket" key: "users.csv" format: name: csv

Labels are arbitrary strings that can be used to identify and group connectors.

Start After Dependencies

Use start_after to specify that a connector should wait for other connectors to complete:

inputs: users: stream: users connector_config: labels: ["users"] transport: name: s3_input config: bucket_name: "my-bucket" key: "users.csv" format: name: csv orders: stream: orders connector_config: start_after: ["users"] # Wait for users connector transport: name: s3_input config: bucket_name: "my-bucket" key: "orders.csv" format: name: csv

The orders connector will not start until all connectors with the users label have finished processing their inputs.

Multiple Dependencies

Specify multiple labels to wait for multiple connectors:

inputs: users: stream: users connector_config: labels: ["dimension"] transport: name: s3_input config: key: "users.csv" format: name: csv products: stream: products connector_config: labels: ["dimension"] transport: name: s3_input config: key: "products.csv" format: name: csv orders: stream: orders connector_config: start_after: ["dimension"] # Wait for both users and products transport: name: s3_input config: key: "orders.csv" format: name: csv

Pausing and Resuming Connectors

Manually pause and resume input connectors:

# Pause a connector pipeline.pause_connector(table_name="users", connector_name="users") # Resume a connector pipeline.resume_connector(table_name="users", connector_name="users")

This allows dynamic control over data ingestion.

Starting Connectors Paused

Start a connector in the paused state:

inputs: events: stream: events connector_config: paused: true # Start paused transport: name: kafka_input config: bootstrap.servers: "localhost:9092" topics: ["events"] format: name: json

Resume the connector when ready:

pipeline.resume_connector("events", "events")

Use Cases

Ordered Batch Loading

Load dimension tables before fact tables:

inputs: # Load dimensions first users: stream: users connector_config: labels: ["dimension"] transport: name: s3_input config: key: "users.csv" format: name: csv products: stream: products connector_config: labels: ["dimension"] transport: name: s3_input config: key: "products.csv" format: name: csv # Load facts after dimensions orders: stream: orders connector_config: start_after: ["dimension"] transport: name: s3_input config: key: "orders.csv" format: name: csv

Staged Ingestion

Ingest data in stages:

inputs: stage1: stream: data connector_config: labels: ["stage1"] transport: name: s3_input config: prefix: "stage1/" format: name: csv stage2: stream: data connector_config: labels: ["stage2"] start_after: ["stage1"] transport: name: s3_input config: prefix: "stage2/" format: name: csv stage3: stream: data connector_config: start_after: ["stage2"] transport: name: s3_input config: prefix: "stage3/" format: name: csv

Manual Control

Start connectors paused and control them manually:

# Start pipeline with paused connectors pipeline.start() # Resume connectors in order pipeline.resume_connector("users", "users") # ... wait for users to complete ... pipeline.resume_connector("orders", "orders")

Monitoring Connector Status

Check connector status:

stats = pipeline.input_connector_stats("users", "users") print(f"End of input: {stats.end_of_input}") print(f"Total records: {stats.total_records}")

Whatโ€™s Next