Internal site. Jolli authentication required to view.
Skip to Content
πŸ”Œ ConnectorsCloud Storage Connectors

Last Updated: 3/19/2026


S3/Delta Lake/Iceberg Connectors

Feldera provides connectors for reading from and writing to cloud storage systems including Amazon S3, Delta Lake, and Apache Iceberg. These connectors enable batch and streaming ingestion from data lakes and object storage.

S3 Input Connector

The S3 input connector reads objects from Amazon S3 or S3-compatible storage (MinIO, Wasabi, etc.).

Basic Configuration

inputs: data: stream: events connector_config: transport: name: s3_input config: aws_access_key_id: "AKIAIOSFODNN7EXAMPLE" aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" region: "us-east-1" bucket_name: "my-bucket" prefix: "events/" format: name: csv

Configuration Options

Authentication:

  • aws_access_key_id β€” AWS access key ID
  • aws_secret_access_key β€” AWS secret access key
  • no_sign_request β€” Set to true for anonymous access to public buckets (default: false)

Object Selection:

  • key β€” Read a single object by key (e.g., "data/file.csv")
  • prefix β€” Read all objects matching a prefix (e.g., "events/" or "" for all objects)

Region and Endpoint:

  • region β€” AWS region (e.g., "us-east-1")
  • bucket_name β€” S3 bucket name
  • endpoint_url β€” Custom endpoint for S3-compatible services (e.g., "http://minio:9000")

Performance:

  • max_concurrent_fetches β€” Number of objects fetched in parallel (default: 8, range: 1–10)
  • max_retries β€” Number of retry attempts with exponential backoff (default: 5, range: 3–10)

Reading Multiple Objects

Use the prefix option to read all objects matching a prefix:

transport: name: s3_input config: aws_access_key_id: "..." aws_secret_access_key: "..." region: "us-east-1" bucket_name: "my-bucket" prefix: "events/2024/" # Read all objects under events/2024/

Using IAM Roles

In AWS environments with IAM roles (e.g., EKS with IRSA), omit credentials:

transport: name: s3_input config: region: "us-east-1" bucket_name: "my-bucket" prefix: "events/"

S3-Compatible Storage

For MinIO or other S3-compatible services:

transport: name: s3_input config: aws_access_key_id: "minioadmin" aws_secret_access_key: "minioadmin" region: "us-east-1" bucket_name: "my-bucket" endpoint_url: "http://minio:9000" prefix: ""

Delta Lake Connector

Delta Lake is an open-source storage layer that brings ACID transactions to data lakes. Feldera supports both reading from and writing to Delta Lake tables.

Delta Lake Input

The Delta Lake input connector supports three ingestion modes:

Snapshot mode β€” Read a snapshot of the table and stop:

inputs: users: stream: users connector_config: transport: name: delta_table_input config: uri: "s3://my-bucket/users" mode: "snapshot" AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..." AWS_REGION: "us-east-1" format: name: parquet

Follow mode β€” Continuously ingest changes starting from a version:

transport: name: delta_table_input config: uri: "s3://my-bucket/users" mode: "follow" version: 100 # Start from version 100 AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..."

Snapshot and follow mode β€” Read snapshot then follow changes:

transport: name: delta_table_input config: uri: "s3://my-bucket/users" mode: "snapshot_and_follow" AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..."

Delta Lake Configuration Options

Table Location:

  • uri β€” Delta table URI (e.g., "s3://bucket/path", "file:///local/path")

Ingestion Mode:

  • mode β€” One of: "snapshot", "follow", "snapshot_and_follow", "cdc"

Version Selection:

  • version β€” Start from a specific table version
  • datetime β€” Start from a specific timestamp (ISO-8601 format)
  • end_version β€” Stop after reaching this version (follow modes only)

Filtering:

  • filter β€” SQL WHERE clause to filter rows (e.g., "age > 18")
  • snapshot_filter β€” Additional filter for initial snapshot only
  • timestamp_column β€” Column to use for timestamp-ordered ingestion

Performance:

  • num_parsers β€” Number of parallel parsing tasks (default: 4, range: 1–10)
  • max_concurrent_readers β€” Global limit on concurrent object store reads (default: 6)
  • skip_unused_columns β€” Don’t read columns not used in views (default: false)

Storage Options: Storage options are passed as flat key-value pairs:

  • AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION β€” AWS credentials
  • AZURE_STORAGE_ACCOUNT_NAME, AZURE_STORAGE_ACCOUNT_KEY β€” Azure credentials
  • GOOGLE_SERVICE_ACCOUNT β€” GCS service account path

Timestamp-Ordered Ingestion

For tables with a timestamp column, enable ordered ingestion:

CREATE TABLE events ( event_id BIGINT, event_time TIMESTAMP LATENESS INTERVAL 1 DAY, user_id BIGINT, action VARCHAR );
transport: name: delta_table_input config: uri: "s3://my-bucket/events" mode: "snapshot" timestamp_column: "event_time" AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..."

The connector ingests data in timestamp order, respecting the LATENESS attribute.

CDC Mode

In CDC mode, the Delta table is treated as an append-only log of changes:

transport: name: delta_table_input config: uri: "s3://my-bucket/changes" mode: "cdc" cdc_delete_filter: "op = 'D'" cdc_order_by: "seq_num" version: 0 AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..."
  • cdc_delete_filter β€” SQL predicate identifying delete operations
  • cdc_order_by β€” SQL expression for ordering updates

Delta Lake Output

Write pipeline output to a Delta Lake table:

outputs: results: stream: user_stats connector_config: transport: name: delta_table_output config: uri: "s3://my-bucket/results" mode: "append" AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..." format: name: parquet

Write modes:

  • append β€” Append new data to existing table (default)
  • truncate β€” Truncate existing table before writing
  • error_if_exists β€” Fail if table already exists

Configuration:

  • uri β€” Delta table URI
  • mode β€” Write mode
  • max_retries β€” Maximum retries for failed operations (default: infinite)
  • Storage options (same as input)

Transactions

Delta Lake input supports transaction modes:

transport: name: delta_table_input config: uri: "s3://my-bucket/users" mode: "snapshot_and_follow" transaction_mode: "always"

Transaction modes:

  • none β€” No transaction boundaries (default)
  • snapshot β€” Ingest snapshot in transactions
  • always β€” All updates in transactions (matches Delta Lake transaction log)

Apache Iceberg Connector

Apache Iceberg is a high-performance table format for large analytic datasets. Feldera supports reading from Iceberg tables.

Iceberg Input Configuration

inputs: events: stream: events connector_config: transport: name: iceberg_input config: uri: "s3://my-bucket/warehouse/events" mode: "snapshot" AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..." AWS_REGION: "us-east-1" format: name: parquet

Iceberg Configuration Options

The Iceberg connector shares many configuration options with Delta Lake:

  • uri β€” Iceberg table URI
  • mode β€” Ingestion mode: "snapshot", "follow", "snapshot_and_follow"
  • version β€” Start from specific snapshot ID
  • datetime β€” Start from specific timestamp
  • filter β€” Row filter (SQL WHERE clause)
  • timestamp_column β€” Column for timestamp-ordered ingestion
  • Storage options (AWS, Azure, GCS credentials)

Iceberg Catalog Integration

Iceberg supports multiple catalog types. Specify catalog configuration via storage options:

AWS Glue Catalog:

transport: name: iceberg_input config: uri: "s3://my-bucket/warehouse/events" mode: "snapshot" AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..." AWS_REGION: "us-east-1" catalog.type: "glue" catalog.warehouse: "s3://my-bucket/warehouse"

REST Catalog:

transport: name: iceberg_input config: uri: "http://catalog:8181/warehouse/events" mode: "snapshot" catalog.type: "rest" catalog.uri: "http://catalog:8181"

Performance Tuning

Concurrent Fetches

Increase parallelism for faster ingestion:

transport: name: s3_input config: max_concurrent_fetches: 10 # S3
transport: name: delta_table_input config: num_parsers: 8 # Delta Lake max_concurrent_readers: 10

Pipeline Workers

Increase pipeline workers for better throughput:

{ "workers": 16 }

Compression

Use compressed formats (Parquet, ORC) for better performance and lower storage costs.

Complete Example

Here’s a complete pipeline reading from Delta Lake and writing to S3:

CREATE TABLE events ( event_id BIGINT, event_time TIMESTAMP, user_id BIGINT, action VARCHAR ); CREATE VIEW hourly_stats AS SELECT DATE_TRUNC('hour', event_time) as hour, action, COUNT(*) as event_count FROM events GROUP BY hour, action;
inputs: events: stream: events connector_config: transport: name: delta_table_input config: uri: "s3://data-lake/events" mode: "snapshot_and_follow" AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..." AWS_REGION: "us-east-1" format: name: parquet outputs: stats: stream: hourly_stats connector_config: transport: name: delta_table_output config: uri: "s3://data-lake/stats" mode: "append" AWS_ACCESS_KEY_ID: "..." AWS_SECRET_ACCESS_KEY: "..." format: name: parquet

What’s Next