Last Updated: 3/19/2026
Python SDK
The Feldera Python SDK provides a programmatic interface for creating, managing, and interacting with Feldera pipelines. This guide covers installation, core concepts, and common workflows.
Installation
Install the Feldera Python SDK using pip or uv:
pip install felderaOr with uv:
uv pip install felderaCore Classes
The SDK provides three main classes for working with Feldera:
FelderaClient
The FelderaClient class manages the connection to the Feldera API server. It handles authentication and provides low-level access to the Feldera REST API.
from feldera import FelderaClient
# Connect to local Feldera instance (default)
client = FelderaClient()
# Connect to a remote instance with authentication
client = FelderaClient(
url="https://try.feldera.com",
api_key="apikey:your_key_here"
)
# Connect with custom TLS certificate
client = FelderaClient(
url="https://localhost:8080",
api_key="apikey:your_key_here",
requests_verify="/path/to/tls.crt"
)You can also configure the client using environment variables:
export FELDERA_HOST="https://localhost:8080"
export FELDERA_API_KEY="apikey:your_key_here"
export FELDERA_HTTPS_TLS_CERT="/path/to/tls.crt"PipelineBuilder
The PipelineBuilder class helps you create new pipelines. It takes SQL code and configuration, then creates or updates a pipeline in Feldera.
from feldera import PipelineBuilder
sql = """
CREATE TABLE events (
id BIGINT NOT NULL PRIMARY KEY,
message VARCHAR
) WITH ('materialized' = 'true');
CREATE MATERIALIZED VIEW event_count AS
SELECT COUNT(*) AS total FROM events;
"""
# Create a new pipeline
pipeline = PipelineBuilder(client, "my_pipeline", sql).create_or_replace()The create_or_replace() method will delete any existing pipeline with the same name and create a new one. Use create() if you want to fail when a pipeline already exists.
Pipeline
The Pipeline class represents an existing pipeline and provides methods to control its lifecycle, push data, query results, and monitor status.
from feldera import Pipeline
# Get an existing pipeline
pipeline = Pipeline.get("my_pipeline", client)
# Or get all pipelines
pipelines = Pipeline.all(client)Creating and Managing Pipelines
Basic Pipeline Creation
Here’s a complete example of creating and running a pipeline:
from feldera import FelderaClient, PipelineBuilder
client = FelderaClient()
sql = """
CREATE TABLE orders (
order_id BIGINT NOT NULL PRIMARY KEY,
customer_id BIGINT,
amount DECIMAL(10, 2)
) WITH ('materialized' = 'true');
CREATE MATERIALIZED VIEW total_revenue AS
SELECT SUM(amount) AS revenue FROM orders;
"""
# Create and start the pipeline
pipeline = PipelineBuilder(client, "revenue_pipeline", sql).create_or_replace()
pipeline.start()
# Wait for the pipeline to be running
pipeline.wait_for_status(PipelineStatus.RUNNING, timeout=60)Pipeline Lifecycle Control
Control pipeline execution with these methods:
# Start the pipeline
pipeline.start()
# Start in paused state
pipeline.start_paused()
# Pause a running pipeline
pipeline.pause()
# Resume a paused pipeline
pipeline.resume()
# Stop the pipeline (with checkpoint)
pipeline.stop(force=False)
# Stop immediately (no checkpoint)
pipeline.stop(force=True)
# Restart the pipeline
pipeline.restart()Check Pipeline Status
from feldera.enums import PipelineStatus
# Get current status
status = pipeline.status()
if status == PipelineStatus.RUNNING:
print("Pipeline is running")
elif status == PipelineStatus.PAUSED:
print("Pipeline is paused")
# Wait for a specific status
pipeline.wait_for_status(PipelineStatus.RUNNING, timeout=60)Pushing Data to Pipelines
Push JSON Data
# Push a single record
pipeline.input_json("orders", {
"order_id": 1,
"customer_id": 100,
"amount": 99.99
})
# Push multiple records
pipeline.input_json("orders", [
{"order_id": 2, "customer_id": 101, "amount": 149.99},
{"order_id": 3, "customer_id": 102, "amount": 79.99}
])
# Push with insert/delete format
pipeline.input_json("orders", {
"insert": {"order_id": 4, "customer_id": 103, "amount": 199.99}
}, update_format="insert_delete")Push Pandas DataFrames
import pandas as pd
# Create a DataFrame
df = pd.DataFrame({
"order_id": [5, 6, 7],
"customer_id": [104, 105, 106],
"amount": [299.99, 399.99, 499.99]
})
# Push to the pipeline
pipeline.input_pandas("orders", df)Querying Pipeline Results
Query as JSON
# Execute a query and iterate over results
for row in pipeline.query("SELECT * FROM total_revenue"):
print(f"Total revenue: {row['revenue']}")
# Query returns a generator, so you can process large results efficiently
results = list(pipeline.query("SELECT * FROM orders WHERE amount > 100"))Query as Parquet
# Save query results to a Parquet file
pipeline.query_parquet("SELECT * FROM orders", "orders.parquet")Query as Tabular Text
# Get results as formatted text
for line in pipeline.query_tabular("SELECT * FROM total_revenue"):
print(line)Execute Queries Without Results
# Execute INSERT or DELETE statements
pipeline.execute("INSERT INTO orders VALUES (8, 107, 599.99)")
pipeline.execute("DELETE FROM orders WHERE order_id = 1")Listening to Output Streams
Using OutputHandler
# Listen to changes in a view
handler = pipeline.listen("total_revenue")
# Process changes as they arrive
for chunk in handler:
print(f"Received {len(chunk)} changes")
for change in chunk:
print(change)
# Stop listening
handler.stop()Using Callbacks
import pandas as pd
def process_chunk(chunk: pd.DataFrame, seq_no: int):
print(f"Processing chunk {seq_no} with {len(chunk)} rows")
print(chunk)
# Process each chunk with a callback
pipeline.foreach_chunk("total_revenue", process_chunk)Working with Connectors
Control Input Connectors
# Pause an input connector
pipeline.pause_connector("orders", "kafka_source")
# Resume an input connector
pipeline.resume_connector("orders", "kafka_source")
# Get connector statistics
stats = pipeline.input_connector_stats("orders", "kafka_source")
print(f"Records processed: {stats.total_records}")Get Output Connector Statistics
stats = pipeline.output_connector_stats("total_revenue", "kafka_sink")
print(f"Records transmitted: {stats.transmitted_records}")Pipeline Statistics and Monitoring
Get Pipeline Statistics
stats = pipeline.stats()
# Global metrics
print(f"Total input records: {stats.global_metrics.total_input_records}")
print(f"Total processed records: {stats.global_metrics.total_processed_records}")
print(f"Pipeline complete: {stats.global_metrics.pipeline_complete}")
# Check if pipeline is idle
if pipeline.is_complete():
print("Pipeline has processed all inputs")Wait for Pipeline Completion
# Wait for all inputs to be processed
pipeline.wait_for_completion(timeout_s=300)
# Wait for pipeline to become idle
pipeline.wait_for_idle(idle_interval_s=5.0, timeout_s=60.0)Get Pipeline Logs
# Stream pipeline logs
for log_line in pipeline.logs():
print(log_line)Transactions
Feldera supports transactions for atomic batch processing:
# Start a transaction
txn_id = pipeline.start_transaction()
# Push data within the transaction
pipeline.input_json("orders", {"order_id": 9, "customer_id": 108, "amount": 699.99})
pipeline.input_json("orders", {"order_id": 10, "customer_id": 109, "amount": 799.99})
# Commit the transaction
pipeline.commit_transaction(txn_id, wait=True)
# Check transaction status
status = pipeline.transaction_status()Checkpointing (Enterprise Feature)
# Create a checkpoint
seq = pipeline.checkpoint(wait=True, timeout_s=60)
# Check checkpoint status
status = pipeline.checkpoint_status(seq)
# Sync checkpoint to object store
uuid = pipeline.sync_checkpoint(wait=True, timeout_s=120)
# Get checkpoint metadata
checkpoints = pipeline.checkpoints()
for cp in checkpoints:
print(f"Checkpoint {cp.sequence_number} at {cp.timestamp}")Modifying Pipelines
# Modify pipeline SQL
new_sql = """
CREATE TABLE orders (
order_id BIGINT NOT NULL PRIMARY KEY,
customer_id BIGINT,
amount DECIMAL(10, 2),
order_date TIMESTAMP
) WITH ('materialized' = 'true');
"""
pipeline.modify(sql=new_sql)
# Modify runtime configuration
from feldera.runtime_config import RuntimeConfig
runtime_config = pipeline.runtime_config()
runtime_config.workers = 8
pipeline.set_runtime_config(runtime_config)Cleaning Up
# Stop and delete the pipeline
pipeline.stop(force=True)
pipeline.delete()
# Or stop, clear storage, and delete
pipeline.stop(force=True)
pipeline.clear_storage(wait=True)
pipeline.delete()Error Handling
from feldera.rest.errors import FelderaAPIError
try:
pipeline = Pipeline.get("nonexistent_pipeline", client)
except FelderaAPIError as e:
print(f"Error: {e.message}")
print(f"Status code: {e.status_code}")Complete Example
Here’s a complete example that ties everything together:
from feldera import FelderaClient, PipelineBuilder
from feldera.enums import PipelineStatus
import time
# Initialize client
client = FelderaClient()
# Define SQL program
sql = """
CREATE TABLE sensor_data (
sensor_id BIGINT NOT NULL,
temperature DECIMAL(5, 2),
timestamp TIMESTAMP
) WITH ('materialized' = 'true');
CREATE MATERIALIZED VIEW avg_temperature AS
SELECT sensor_id, AVG(temperature) AS avg_temp
FROM sensor_data
GROUP BY sensor_id;
"""
# Create and start pipeline
pipeline = PipelineBuilder(client, "sensor_pipeline", sql).create_or_replace()
pipeline.start()
pipeline.wait_for_status(PipelineStatus.RUNNING)
# Push some data
pipeline.input_json("sensor_data", [
{"sensor_id": 1, "temperature": 22.5, "timestamp": "2024-01-01T10:00:00"},
{"sensor_id": 2, "temperature": 23.1, "timestamp": "2024-01-01T10:00:00"},
{"sensor_id": 1, "temperature": 22.8, "timestamp": "2024-01-01T10:05:00"}
])
# Wait for processing
time.sleep(2)
# Query results
for row in pipeline.query("SELECT * FROM avg_temperature ORDER BY sensor_id"):
print(f"Sensor {row['sensor_id']}: {row['avg_temp']}°C")
# Clean up
pipeline.stop(force=True)
pipeline.delete()What’s Next
- Cli Reference: Learn how to manage pipelines from the command line using the fda tool
- Connectors Overview: Discover how to connect pipelines to external data sources and sinks
- REST API Overview: Explore the full REST API for programmatic pipeline management and integration