Last Updated: 3/19/2026
Fault Tolerance
Feldera provides fault tolerance capabilities that allow pipelines to recover from failures without data loss or duplication. When enabled, a pipeline can gracefully restart from the exact point of an abrupt shutdown or crash, picking up where it left off.
Fault Tolerance Models
Feldera supports two fault tolerance models that provide different guarantees:
Exactly-Once
The exactly_once model ensures that each record is output exactly once, even in the presence of failures. Crashes do not drop or duplicate input or output.
{
"fault_tolerance": {
"model": "exactly_once"
}
}This is the default model when fault tolerance is enabled and provides the strongest consistency guarantees.
At-Least-Once
The at_least_once model ensures that each record is output at least once. Crashes may duplicate output, but no input or output is dropped.
{
"fault_tolerance": {
"model": "at_least_once"
}
}This model has slightly lower overhead than exactly-once but may produce duplicate outputs after recovery.
Disabling Fault Tolerance
To disable fault tolerance entirely, set the model to "none" or omit the fault_tolerance configuration:
{
"fault_tolerance": {
"model": "none"
}
}Checkpointing
Checkpoints are snapshots of pipeline state that enable recovery after failures. Feldera supports both automatic periodic checkpointing and manual on-demand checkpoints.
Automatic Checkpointing
Configure automatic checkpointing with the checkpoint_interval_secs parameter:
{
"fault_tolerance": {
"model": "exactly_once",
"checkpoint_interval_secs": 60
}
}The pipeline creates checkpoints at the specified interval (default: 60 seconds, range: 1β3600 seconds). Set to null to disable periodic checkpointing:
{
"fault_tolerance": {
"model": "exactly_once",
"checkpoint_interval_secs": null
}
}Manual Checkpointing
Create checkpoints on demand using the API or Python SDK:
# Create a checkpoint and return immediately
seq = pipeline.checkpoint()
# Create a checkpoint and wait for completion
seq = pipeline.checkpoint(wait=True, timeout_s=300)The checkpoint sequence number is returned and can be used to check checkpoint status:
from feldera import CheckpointStatus
status = pipeline.checkpoint_status(seq)
if status == CheckpointStatus.Success:
print(f"Checkpoint {seq} completed successfully")
elif status == CheckpointStatus.Failure:
print(f"Checkpoint {seq} failed: {status.get_error()}")Checkpoint statuses:
Successβ Checkpoint completed successfullyFailureβ Checkpoint failed (includes error message)InProgressβ Checkpoint is still being createdUnknownβ Checkpoint sequence number is unknown or too old
Storage Requirements
Fault tolerance requires storage to be enabled in the pipeline configuration. The storage backend is used to persist checkpoints and logs:
{
"storage": {
"backend": {
"name": "default"
}
},
"storage_config": {
"path": "/data/pipeline-state"
},
"fault_tolerance": {
"model": "exactly_once",
"checkpoint_interval_secs": 60
}
}The storage_config.path directory stores:
- Checkpoints β Snapshots of pipeline state
- Logs β Write-ahead logs for recovery
- Metadata β Checkpoint metadata and recovery information
Recovery Process
When a pipeline with fault tolerance enabled restarts after a failure, it goes through the following recovery process:
- Bootstrapping β The pipeline loads the most recent successful checkpoint from storage
- Replaying β The pipeline replays logged inputs that were received after the checkpoint
- Running β The pipeline resumes normal operation
This process ensures that no inputs are lost and outputs are consistent with the configured fault tolerance model.
Bootstrap Policies
Control how the pipeline handles bootstrapping with the bootstrap_policy parameter:
from feldera import BootstrapPolicy
# Automatically bootstrap (default)
pipeline.start(bootstrap_policy=BootstrapPolicy.ALLOW)
# Fail if bootstrapping is required
pipeline.start(bootstrap_policy=BootstrapPolicy.REJECT)
# Wait for explicit approval before bootstrapping
pipeline.start(bootstrap_policy=BootstrapPolicy.AWAIT_APPROVAL)The AWAIT_APPROVAL policy is useful when you want to manually verify conditions before allowing the pipeline to bootstrap and potentially recompute large amounts of state.
Checkpoint Synchronization to Object Storage
For disaster recovery and cross-environment migration, Feldera can synchronize checkpoints to object storage (S3, GCS, Azure Blob Storage).
Configuring Checkpoint Sync
Configure checkpoint synchronization in the storage backend:
{
"storage_config": {
"path": "/data/pipeline-state",
"cache": "page_cache"
},
"storage": {
"backend": {
"name": "file",
"config": {
"sync": {
"bucket": "my-bucket/checkpoints",
"region": "us-east-1",
"provider": "AWS",
"access_key": "AKIAIOSFODNN7EXAMPLE",
"secret_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"push_interval": 300,
"retention_min_count": 10,
"retention_min_age": 30
}
}
}
}
}Sync configuration options:
bucketβ S3 bucket name (may include a path prefix)regionβ AWS region (optional for MinIO)providerβ Cloud provider:"AWS","Minio", etc.access_key/secret_keyβ Authentication credentials (optional if using IAM roles)endpointβ Custom endpoint for S3-compatible storage (e.g., MinIO)push_intervalβ Interval in seconds between automatic checkpoint pushes (optional)retention_min_countβ Minimum number of checkpoints to retain (default: 10)retention_min_ageβ Minimum age in days before a checkpoint can be deleted (default: 30)
Manual Checkpoint Sync
Manually sync a checkpoint to object storage:
# Sync the latest checkpoint
uuid = pipeline.sync_checkpoint()
# Sync and wait for completion
uuid = pipeline.sync_checkpoint(wait=True, timeout_s=600)Check sync status:
status = pipeline.sync_checkpoint_status(uuid)
if status == CheckpointStatus.Success:
print(f"Checkpoint {uuid} synced successfully")Get the UUID of the last successfully synced checkpoint:
from uuid import UUID
last_uuid: UUID = pipeline.last_successful_checkpoint_sync()Starting from a Synced Checkpoint
Configure the pipeline to start from a checkpoint in object storage:
{
"storage": {
"backend": {
"name": "file",
"config": {
"sync": {
"bucket": "my-bucket/checkpoints",
"start_from_checkpoint": "latest",
"fail_if_no_checkpoint": false,
"standby": true,
"pull_interval": 10
}
}
}
}
}Start-from-checkpoint options:
start_from_checkpointβ Checkpoint to load:"latest"or a specific UUIDfail_if_no_checkpointβ Iftrue, fail if the checkpoint cannot be fetched (default:false)standbyβ Iftrue, start in standby mode and wait for activation (default:false)pull_intervalβ Interval in seconds to fetch the latest checkpoint while in standby (default: 10)
Standby mode behavior:
When standby is enabled:
- If
start_from_checkpointis"latest", the pipeline continuously fetches the latest checkpoint until activated - If
start_from_checkpointis a UUID, the pipeline fetches that specific checkpoint once and waits for activation
Activate the pipeline to proceed with bootstrapping:
pipeline.activate(wait=True, timeout_s=300)Read-Only Checkpoint Bucket
Specify a read-only bucket as a fallback checkpoint source:
{
"storage": {
"backend": {
"name": "file",
"config": {
"sync": {
"bucket": "my-bucket/checkpoints",
"read_bucket": "backup-bucket/checkpoints"
}
}
}
}
}When the pipeline has no local checkpoint and the primary bucket contains no checkpoint, it attempts to fetch from read_bucket. The pipeline never writes to the read-only bucket.
Connector Support
Fault tolerance requires support from input and output connectors. Not all connectors support fault tolerance:
Connectors with fault tolerance support:
- Kafka input/output
- HTTP input (with exactly-once model)
- File input/output
- S3 input
- Delta Lake input/output
Connectors without fault tolerance support:
- HTTP output
- Some streaming connectors (check connector documentation)
When using connectors without fault tolerance support, the pipeline can still create checkpoints, but recovery may not be fully consistent.
Performance Considerations
Fault tolerance introduces some overhead:
- Checkpoint creation β Creating checkpoints requires writing state to storage, which takes time and I/O bandwidth
- Logging β Write-ahead logging adds latency to input processing
- Storage space β Checkpoints and logs consume disk space
To minimize overhead:
- Increase
checkpoint_interval_secsif you can tolerate longer recovery times - Use fast storage (NVMe SSDs) for the checkpoint directory
- Enable compression in storage configuration
- Use the
at_least_oncemodel if you can tolerate duplicate outputs
Whatβs Next
- Pipeline Configuration: Learn about all available configuration options
- Pipeline Lifecycle: Understand pipeline states and transitions
- Checkpoint Sync: Deep dive into syncing checkpoints to object storage