Last Updated: 3/19/2026
Connector Orchestration
Feldera provides connector orchestration features to control the startup order and dependencies between connectors. This enables building ordered ingestion pipelines and coordinating data flow from multiple sources.
Connector Labels
Assign labels to connectors to group them:
inputs:
users:
stream: users
connector_config:
labels: ["dimension", "batch1"]
transport:
name: s3_input
config:
bucket_name: "my-bucket"
key: "users.csv"
format:
name: csvLabels are arbitrary strings that can be used to identify and group connectors.
Start After Dependencies
Use start_after to specify that a connector should wait for other connectors to complete:
inputs:
users:
stream: users
connector_config:
labels: ["users"]
transport:
name: s3_input
config:
bucket_name: "my-bucket"
key: "users.csv"
format:
name: csv
orders:
stream: orders
connector_config:
start_after: ["users"] # Wait for users connector
transport:
name: s3_input
config:
bucket_name: "my-bucket"
key: "orders.csv"
format:
name: csvThe orders connector will not start until all connectors with the users label have finished processing their inputs.
Multiple Dependencies
Specify multiple labels to wait for multiple connectors:
inputs:
users:
stream: users
connector_config:
labels: ["dimension"]
transport:
name: s3_input
config:
key: "users.csv"
format:
name: csv
products:
stream: products
connector_config:
labels: ["dimension"]
transport:
name: s3_input
config:
key: "products.csv"
format:
name: csv
orders:
stream: orders
connector_config:
start_after: ["dimension"] # Wait for both users and products
transport:
name: s3_input
config:
key: "orders.csv"
format:
name: csvPausing and Resuming Connectors
Manually pause and resume input connectors:
# Pause a connector
pipeline.pause_connector(table_name="users", connector_name="users")
# Resume a connector
pipeline.resume_connector(table_name="users", connector_name="users")This allows dynamic control over data ingestion.
Starting Connectors Paused
Start a connector in the paused state:
inputs:
events:
stream: events
connector_config:
paused: true # Start paused
transport:
name: kafka_input
config:
bootstrap.servers: "localhost:9092"
topics: ["events"]
format:
name: jsonResume the connector when ready:
pipeline.resume_connector("events", "events")Use Cases
Ordered Batch Loading
Load dimension tables before fact tables:
inputs:
# Load dimensions first
users:
stream: users
connector_config:
labels: ["dimension"]
transport:
name: s3_input
config:
key: "users.csv"
format:
name: csv
products:
stream: products
connector_config:
labels: ["dimension"]
transport:
name: s3_input
config:
key: "products.csv"
format:
name: csv
# Load facts after dimensions
orders:
stream: orders
connector_config:
start_after: ["dimension"]
transport:
name: s3_input
config:
key: "orders.csv"
format:
name: csvStaged Ingestion
Ingest data in stages:
inputs:
stage1:
stream: data
connector_config:
labels: ["stage1"]
transport:
name: s3_input
config:
prefix: "stage1/"
format:
name: csv
stage2:
stream: data
connector_config:
labels: ["stage2"]
start_after: ["stage1"]
transport:
name: s3_input
config:
prefix: "stage2/"
format:
name: csv
stage3:
stream: data
connector_config:
start_after: ["stage2"]
transport:
name: s3_input
config:
prefix: "stage3/"
format:
name: csvManual Control
Start connectors paused and control them manually:
# Start pipeline with paused connectors
pipeline.start()
# Resume connectors in order
pipeline.resume_connector("users", "users")
# ... wait for users to complete ...
pipeline.resume_connector("orders", "orders")Monitoring Connector Status
Check connector status:
stats = pipeline.input_connector_stats("users", "users")
print(f"End of input: {stats.end_of_input}")
print(f"Total records: {stats.total_records}")Whatโs Next
- Kafka Connector: Learn about streaming connectors
- Cloud Storage Connectors (S3/Delta Lake/Iceberg): Learn about batch connectors
- Pipeline Lifecycle: Understand pipeline states