Last Updated: 3/19/2026
S3/Delta Lake/Iceberg Connectors
Feldera provides connectors for reading from and writing to cloud storage systems including Amazon S3, Delta Lake, and Apache Iceberg. These connectors enable batch and streaming ingestion from data lakes and object storage.
S3 Input Connector
The S3 input connector reads objects from Amazon S3 or S3-compatible storage (MinIO, Wasabi, etc.).
Basic Configuration
inputs:
data:
stream: events
connector_config:
transport:
name: s3_input
config:
aws_access_key_id: "AKIAIOSFODNN7EXAMPLE"
aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
region: "us-east-1"
bucket_name: "my-bucket"
prefix: "events/"
format:
name: csvConfiguration Options
Authentication:
aws_access_key_idβ AWS access key IDaws_secret_access_keyβ AWS secret access keyno_sign_requestβ Set totruefor anonymous access to public buckets (default:false)
Object Selection:
keyβ Read a single object by key (e.g.,"data/file.csv")prefixβ Read all objects matching a prefix (e.g.,"events/"or""for all objects)
Region and Endpoint:
regionβ AWS region (e.g.,"us-east-1")bucket_nameβ S3 bucket nameendpoint_urlβ Custom endpoint for S3-compatible services (e.g.,"http://minio:9000")
Performance:
max_concurrent_fetchesβ Number of objects fetched in parallel (default: 8, range: 1β10)max_retriesβ Number of retry attempts with exponential backoff (default: 5, range: 3β10)
Reading Multiple Objects
Use the prefix option to read all objects matching a prefix:
transport:
name: s3_input
config:
aws_access_key_id: "..."
aws_secret_access_key: "..."
region: "us-east-1"
bucket_name: "my-bucket"
prefix: "events/2024/" # Read all objects under events/2024/Using IAM Roles
In AWS environments with IAM roles (e.g., EKS with IRSA), omit credentials:
transport:
name: s3_input
config:
region: "us-east-1"
bucket_name: "my-bucket"
prefix: "events/"S3-Compatible Storage
For MinIO or other S3-compatible services:
transport:
name: s3_input
config:
aws_access_key_id: "minioadmin"
aws_secret_access_key: "minioadmin"
region: "us-east-1"
bucket_name: "my-bucket"
endpoint_url: "http://minio:9000"
prefix: ""Delta Lake Connector
Delta Lake is an open-source storage layer that brings ACID transactions to data lakes. Feldera supports both reading from and writing to Delta Lake tables.
Delta Lake Input
The Delta Lake input connector supports three ingestion modes:
Snapshot mode β Read a snapshot of the table and stop:
inputs:
users:
stream: users
connector_config:
transport:
name: delta_table_input
config:
uri: "s3://my-bucket/users"
mode: "snapshot"
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."
AWS_REGION: "us-east-1"
format:
name: parquetFollow mode β Continuously ingest changes starting from a version:
transport:
name: delta_table_input
config:
uri: "s3://my-bucket/users"
mode: "follow"
version: 100 # Start from version 100
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."Snapshot and follow mode β Read snapshot then follow changes:
transport:
name: delta_table_input
config:
uri: "s3://my-bucket/users"
mode: "snapshot_and_follow"
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."Delta Lake Configuration Options
Table Location:
uriβ Delta table URI (e.g.,"s3://bucket/path","file:///local/path")
Ingestion Mode:
modeβ One of:"snapshot","follow","snapshot_and_follow","cdc"
Version Selection:
versionβ Start from a specific table versiondatetimeβ Start from a specific timestamp (ISO-8601 format)end_versionβ Stop after reaching this version (follow modes only)
Filtering:
filterβ SQL WHERE clause to filter rows (e.g.,"age > 18")snapshot_filterβ Additional filter for initial snapshot onlytimestamp_columnβ Column to use for timestamp-ordered ingestion
Performance:
num_parsersβ Number of parallel parsing tasks (default: 4, range: 1β10)max_concurrent_readersβ Global limit on concurrent object store reads (default: 6)skip_unused_columnsβ Donβt read columns not used in views (default:false)
Storage Options: Storage options are passed as flat key-value pairs:
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_REGIONβ AWS credentialsAZURE_STORAGE_ACCOUNT_NAME,AZURE_STORAGE_ACCOUNT_KEYβ Azure credentialsGOOGLE_SERVICE_ACCOUNTβ GCS service account path
Timestamp-Ordered Ingestion
For tables with a timestamp column, enable ordered ingestion:
CREATE TABLE events (
event_id BIGINT,
event_time TIMESTAMP LATENESS INTERVAL 1 DAY,
user_id BIGINT,
action VARCHAR
);transport:
name: delta_table_input
config:
uri: "s3://my-bucket/events"
mode: "snapshot"
timestamp_column: "event_time"
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."The connector ingests data in timestamp order, respecting the LATENESS attribute.
CDC Mode
In CDC mode, the Delta table is treated as an append-only log of changes:
transport:
name: delta_table_input
config:
uri: "s3://my-bucket/changes"
mode: "cdc"
cdc_delete_filter: "op = 'D'"
cdc_order_by: "seq_num"
version: 0
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."cdc_delete_filterβ SQL predicate identifying delete operationscdc_order_byβ SQL expression for ordering updates
Delta Lake Output
Write pipeline output to a Delta Lake table:
outputs:
results:
stream: user_stats
connector_config:
transport:
name: delta_table_output
config:
uri: "s3://my-bucket/results"
mode: "append"
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."
format:
name: parquetWrite modes:
appendβ Append new data to existing table (default)truncateβ Truncate existing table before writingerror_if_existsβ Fail if table already exists
Configuration:
uriβ Delta table URImodeβ Write modemax_retriesβ Maximum retries for failed operations (default: infinite)- Storage options (same as input)
Transactions
Delta Lake input supports transaction modes:
transport:
name: delta_table_input
config:
uri: "s3://my-bucket/users"
mode: "snapshot_and_follow"
transaction_mode: "always"Transaction modes:
noneβ No transaction boundaries (default)snapshotβ Ingest snapshot in transactionsalwaysβ All updates in transactions (matches Delta Lake transaction log)
Apache Iceberg Connector
Apache Iceberg is a high-performance table format for large analytic datasets. Feldera supports reading from Iceberg tables.
Iceberg Input Configuration
inputs:
events:
stream: events
connector_config:
transport:
name: iceberg_input
config:
uri: "s3://my-bucket/warehouse/events"
mode: "snapshot"
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."
AWS_REGION: "us-east-1"
format:
name: parquetIceberg Configuration Options
The Iceberg connector shares many configuration options with Delta Lake:
uriβ Iceberg table URImodeβ Ingestion mode:"snapshot","follow","snapshot_and_follow"versionβ Start from specific snapshot IDdatetimeβ Start from specific timestampfilterβ Row filter (SQL WHERE clause)timestamp_columnβ Column for timestamp-ordered ingestion- Storage options (AWS, Azure, GCS credentials)
Iceberg Catalog Integration
Iceberg supports multiple catalog types. Specify catalog configuration via storage options:
AWS Glue Catalog:
transport:
name: iceberg_input
config:
uri: "s3://my-bucket/warehouse/events"
mode: "snapshot"
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."
AWS_REGION: "us-east-1"
catalog.type: "glue"
catalog.warehouse: "s3://my-bucket/warehouse"REST Catalog:
transport:
name: iceberg_input
config:
uri: "http://catalog:8181/warehouse/events"
mode: "snapshot"
catalog.type: "rest"
catalog.uri: "http://catalog:8181"Performance Tuning
Concurrent Fetches
Increase parallelism for faster ingestion:
transport:
name: s3_input
config:
max_concurrent_fetches: 10 # S3transport:
name: delta_table_input
config:
num_parsers: 8 # Delta Lake
max_concurrent_readers: 10Pipeline Workers
Increase pipeline workers for better throughput:
{
"workers": 16
}Compression
Use compressed formats (Parquet, ORC) for better performance and lower storage costs.
Complete Example
Hereβs a complete pipeline reading from Delta Lake and writing to S3:
CREATE TABLE events (
event_id BIGINT,
event_time TIMESTAMP,
user_id BIGINT,
action VARCHAR
);
CREATE VIEW hourly_stats AS
SELECT
DATE_TRUNC('hour', event_time) as hour,
action,
COUNT(*) as event_count
FROM events
GROUP BY hour, action;inputs:
events:
stream: events
connector_config:
transport:
name: delta_table_input
config:
uri: "s3://data-lake/events"
mode: "snapshot_and_follow"
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."
AWS_REGION: "us-east-1"
format:
name: parquet
outputs:
stats:
stream: hourly_stats
connector_config:
transport:
name: delta_table_output
config:
uri: "s3://data-lake/stats"
mode: "append"
AWS_ACCESS_KEY_ID: "..."
AWS_SECRET_ACCESS_KEY: "..."
format:
name: parquetWhatβs Next
- Kafka Connector: Learn about streaming data ingestion
- Connector Orchestration: Control connector startup order and dependencies
- Pipeline Configuration: Optimize performance settings for large datasets