Internal site. Jolli authentication required to view.
Skip to Content
πŸ“– ConceptsTransactions

Last Updated: 3/19/2026


Transactions

Feldera supports transactions for atomic batch ingestion. Transactions allow you to group multiple input operations into a single atomic unit that is processed together.

Starting a Transaction

Start a new transaction:

transaction_id = pipeline.start_transaction() print(f"Started transaction {transaction_id}")

Only one transaction can be active at a time. Starting a new transaction while one is active will fail.

Ingesting Data in a Transaction

Push data within the transaction:

transaction_id = pipeline.start_transaction() # Push multiple batches pipeline.input_json("users", {"id": 1, "name": "Alice"}) pipeline.input_json("users", {"id": 2, "name": "Bob"}) pipeline.input_json("orders", {"id": 1, "user_id": 1, "amount": 100.0}) # Commit the transaction pipeline.commit_transaction(transaction_id)

All data pushed during the transaction is processed atomically.

Committing Transactions

Commit the active transaction:

# Commit and return immediately pipeline.commit_transaction() # Commit and wait for completion pipeline.commit_transaction(wait=True, timeout_s=60)

Optionally verify the transaction ID:

pipeline.commit_transaction(transaction_id=123, wait=True)

Transaction Status

Check the current transaction status:

from feldera import TransactionStatus status = pipeline.transaction_status() if status == TransactionStatus.NoTransaction: print("No active transaction") elif status == TransactionStatus.InProgress: print("Transaction in progress") elif status == TransactionStatus.Committing: print("Transaction committing")

Get the active transaction ID:

tid = pipeline.transaction_id() if tid is None: print("No active transaction") else: print(f"Active transaction: {tid}")

Use Cases

Atomic Batch Ingestion

Ensure a batch of related records is processed together:

transaction_id = pipeline.start_transaction() # Load a batch of users and their orders for user in users_batch: pipeline.input_json("users", user) for order in orders_batch: pipeline.input_json("orders", order) pipeline.commit_transaction(wait=True)

Consistent Snapshots

Load a consistent snapshot of multiple tables:

transaction_id = pipeline.start_transaction() # Load snapshot from multiple sources pipeline.input_pandas("users", users_df) pipeline.input_pandas("products", products_df) pipeline.input_pandas("orders", orders_df) pipeline.commit_transaction(wait=True) print("Snapshot loaded")

Transaction Boundaries

Transactions create boundaries in the input stream. The pipeline processes all data within a transaction before moving to the next transaction.

Without transactions, data is processed continuously as it arrives. With transactions, processing is batched by transaction boundaries.

What’s Next