Last Updated: 3/19/2026
Transactions
Feldera supports transactions for atomic batch ingestion. Transactions allow you to group multiple input operations into a single atomic unit that is processed together.
Starting a Transaction
Start a new transaction:
transaction_id = pipeline.start_transaction()
print(f"Started transaction {transaction_id}")Only one transaction can be active at a time. Starting a new transaction while one is active will fail.
Ingesting Data in a Transaction
Push data within the transaction:
transaction_id = pipeline.start_transaction()
# Push multiple batches
pipeline.input_json("users", {"id": 1, "name": "Alice"})
pipeline.input_json("users", {"id": 2, "name": "Bob"})
pipeline.input_json("orders", {"id": 1, "user_id": 1, "amount": 100.0})
# Commit the transaction
pipeline.commit_transaction(transaction_id)All data pushed during the transaction is processed atomically.
Committing Transactions
Commit the active transaction:
# Commit and return immediately
pipeline.commit_transaction()
# Commit and wait for completion
pipeline.commit_transaction(wait=True, timeout_s=60)Optionally verify the transaction ID:
pipeline.commit_transaction(transaction_id=123, wait=True)Transaction Status
Check the current transaction status:
from feldera import TransactionStatus
status = pipeline.transaction_status()
if status == TransactionStatus.NoTransaction:
print("No active transaction")
elif status == TransactionStatus.InProgress:
print("Transaction in progress")
elif status == TransactionStatus.Committing:
print("Transaction committing")Get the active transaction ID:
tid = pipeline.transaction_id()
if tid is None:
print("No active transaction")
else:
print(f"Active transaction: {tid}")Use Cases
Atomic Batch Ingestion
Ensure a batch of related records is processed together:
transaction_id = pipeline.start_transaction()
# Load a batch of users and their orders
for user in users_batch:
pipeline.input_json("users", user)
for order in orders_batch:
pipeline.input_json("orders", order)
pipeline.commit_transaction(wait=True)Consistent Snapshots
Load a consistent snapshot of multiple tables:
transaction_id = pipeline.start_transaction()
# Load snapshot from multiple sources
pipeline.input_pandas("users", users_df)
pipeline.input_pandas("products", products_df)
pipeline.input_pandas("orders", orders_df)
pipeline.commit_transaction(wait=True)
print("Snapshot loaded")Transaction Boundaries
Transactions create boundaries in the input stream. The pipeline processes all data within a transaction before moving to the next transaction.
Without transactions, data is processed continuously as it arrives. With transactions, processing is batched by transaction boundaries.
Whatβs Next
- Pipeline Lifecycle: Understand pipeline states and transitions
- Fault Tolerance: How transactions interact with checkpointing
- Http Connector: Push data to pipelines via HTTP