Internal site. Jolli authentication required to view.
Skip to Content

Last Updated: 3/19/2026


Python SDK

The Feldera Python SDK provides a programmatic interface for creating, managing, and interacting with Feldera pipelines. This guide covers installation, core concepts, and common workflows.

Installation

Install the Feldera Python SDK using pip or uv:

pip install feldera

Or with uv:

uv pip install feldera

Core Classes

The SDK provides three main classes for working with Feldera:

FelderaClient

The FelderaClient class manages the connection to the Feldera API server. It handles authentication and provides low-level access to the Feldera REST API.

from feldera import FelderaClient # Connect to local Feldera instance (default) client = FelderaClient() # Connect to a remote instance with authentication client = FelderaClient( url="https://try.feldera.com", api_key="apikey:your_key_here" ) # Connect with custom TLS certificate client = FelderaClient( url="https://localhost:8080", api_key="apikey:your_key_here", requests_verify="/path/to/tls.crt" )

You can also configure the client using environment variables:

export FELDERA_HOST="https://localhost:8080" export FELDERA_API_KEY="apikey:your_key_here" export FELDERA_HTTPS_TLS_CERT="/path/to/tls.crt"

PipelineBuilder

The PipelineBuilder class helps you create new pipelines. It takes SQL code and configuration, then creates or updates a pipeline in Feldera.

from feldera import PipelineBuilder sql = """ CREATE TABLE events ( id BIGINT NOT NULL PRIMARY KEY, message VARCHAR ) WITH ('materialized' = 'true'); CREATE MATERIALIZED VIEW event_count AS SELECT COUNT(*) AS total FROM events; """ # Create a new pipeline pipeline = PipelineBuilder(client, "my_pipeline", sql).create_or_replace()

The create_or_replace() method will delete any existing pipeline with the same name and create a new one. Use create() if you want to fail when a pipeline already exists.

Pipeline

The Pipeline class represents an existing pipeline and provides methods to control its lifecycle, push data, query results, and monitor status.

from feldera import Pipeline # Get an existing pipeline pipeline = Pipeline.get("my_pipeline", client) # Or get all pipelines pipelines = Pipeline.all(client)

Creating and Managing Pipelines

Basic Pipeline Creation

Here’s a complete example of creating and running a pipeline:

from feldera import FelderaClient, PipelineBuilder client = FelderaClient() sql = """ CREATE TABLE orders ( order_id BIGINT NOT NULL PRIMARY KEY, customer_id BIGINT, amount DECIMAL(10, 2) ) WITH ('materialized' = 'true'); CREATE MATERIALIZED VIEW total_revenue AS SELECT SUM(amount) AS revenue FROM orders; """ # Create and start the pipeline pipeline = PipelineBuilder(client, "revenue_pipeline", sql).create_or_replace() pipeline.start() # Wait for the pipeline to be running pipeline.wait_for_status(PipelineStatus.RUNNING, timeout=60)

Pipeline Lifecycle Control

Control pipeline execution with these methods:

# Start the pipeline pipeline.start() # Start in paused state pipeline.start_paused() # Pause a running pipeline pipeline.pause() # Resume a paused pipeline pipeline.resume() # Stop the pipeline (with checkpoint) pipeline.stop(force=False) # Stop immediately (no checkpoint) pipeline.stop(force=True) # Restart the pipeline pipeline.restart()

Check Pipeline Status

from feldera.enums import PipelineStatus # Get current status status = pipeline.status() if status == PipelineStatus.RUNNING: print("Pipeline is running") elif status == PipelineStatus.PAUSED: print("Pipeline is paused") # Wait for a specific status pipeline.wait_for_status(PipelineStatus.RUNNING, timeout=60)

Pushing Data to Pipelines

Push JSON Data

# Push a single record pipeline.input_json("orders", { "order_id": 1, "customer_id": 100, "amount": 99.99 }) # Push multiple records pipeline.input_json("orders", [ {"order_id": 2, "customer_id": 101, "amount": 149.99}, {"order_id": 3, "customer_id": 102, "amount": 79.99} ]) # Push with insert/delete format pipeline.input_json("orders", { "insert": {"order_id": 4, "customer_id": 103, "amount": 199.99} }, update_format="insert_delete")

Push Pandas DataFrames

import pandas as pd # Create a DataFrame df = pd.DataFrame({ "order_id": [5, 6, 7], "customer_id": [104, 105, 106], "amount": [299.99, 399.99, 499.99] }) # Push to the pipeline pipeline.input_pandas("orders", df)

Querying Pipeline Results

Query as JSON

# Execute a query and iterate over results for row in pipeline.query("SELECT * FROM total_revenue"): print(f"Total revenue: {row['revenue']}") # Query returns a generator, so you can process large results efficiently results = list(pipeline.query("SELECT * FROM orders WHERE amount > 100"))

Query as Parquet

# Save query results to a Parquet file pipeline.query_parquet("SELECT * FROM orders", "orders.parquet")

Query as Tabular Text

# Get results as formatted text for line in pipeline.query_tabular("SELECT * FROM total_revenue"): print(line)

Execute Queries Without Results

# Execute INSERT or DELETE statements pipeline.execute("INSERT INTO orders VALUES (8, 107, 599.99)") pipeline.execute("DELETE FROM orders WHERE order_id = 1")

Listening to Output Streams

Using OutputHandler

# Listen to changes in a view handler = pipeline.listen("total_revenue") # Process changes as they arrive for chunk in handler: print(f"Received {len(chunk)} changes") for change in chunk: print(change) # Stop listening handler.stop()

Using Callbacks

import pandas as pd def process_chunk(chunk: pd.DataFrame, seq_no: int): print(f"Processing chunk {seq_no} with {len(chunk)} rows") print(chunk) # Process each chunk with a callback pipeline.foreach_chunk("total_revenue", process_chunk)

Working with Connectors

Control Input Connectors

# Pause an input connector pipeline.pause_connector("orders", "kafka_source") # Resume an input connector pipeline.resume_connector("orders", "kafka_source") # Get connector statistics stats = pipeline.input_connector_stats("orders", "kafka_source") print(f"Records processed: {stats.total_records}")

Get Output Connector Statistics

stats = pipeline.output_connector_stats("total_revenue", "kafka_sink") print(f"Records transmitted: {stats.transmitted_records}")

Pipeline Statistics and Monitoring

Get Pipeline Statistics

stats = pipeline.stats() # Global metrics print(f"Total input records: {stats.global_metrics.total_input_records}") print(f"Total processed records: {stats.global_metrics.total_processed_records}") print(f"Pipeline complete: {stats.global_metrics.pipeline_complete}") # Check if pipeline is idle if pipeline.is_complete(): print("Pipeline has processed all inputs")

Wait for Pipeline Completion

# Wait for all inputs to be processed pipeline.wait_for_completion(timeout_s=300) # Wait for pipeline to become idle pipeline.wait_for_idle(idle_interval_s=5.0, timeout_s=60.0)

Get Pipeline Logs

# Stream pipeline logs for log_line in pipeline.logs(): print(log_line)

Transactions

Feldera supports transactions for atomic batch processing:

# Start a transaction txn_id = pipeline.start_transaction() # Push data within the transaction pipeline.input_json("orders", {"order_id": 9, "customer_id": 108, "amount": 699.99}) pipeline.input_json("orders", {"order_id": 10, "customer_id": 109, "amount": 799.99}) # Commit the transaction pipeline.commit_transaction(txn_id, wait=True) # Check transaction status status = pipeline.transaction_status()

Checkpointing (Enterprise Feature)

# Create a checkpoint seq = pipeline.checkpoint(wait=True, timeout_s=60) # Check checkpoint status status = pipeline.checkpoint_status(seq) # Sync checkpoint to object store uuid = pipeline.sync_checkpoint(wait=True, timeout_s=120) # Get checkpoint metadata checkpoints = pipeline.checkpoints() for cp in checkpoints: print(f"Checkpoint {cp.sequence_number} at {cp.timestamp}")

Modifying Pipelines

# Modify pipeline SQL new_sql = """ CREATE TABLE orders ( order_id BIGINT NOT NULL PRIMARY KEY, customer_id BIGINT, amount DECIMAL(10, 2), order_date TIMESTAMP ) WITH ('materialized' = 'true'); """ pipeline.modify(sql=new_sql) # Modify runtime configuration from feldera.runtime_config import RuntimeConfig runtime_config = pipeline.runtime_config() runtime_config.workers = 8 pipeline.set_runtime_config(runtime_config)

Cleaning Up

# Stop and delete the pipeline pipeline.stop(force=True) pipeline.delete() # Or stop, clear storage, and delete pipeline.stop(force=True) pipeline.clear_storage(wait=True) pipeline.delete()

Error Handling

from feldera.rest.errors import FelderaAPIError try: pipeline = Pipeline.get("nonexistent_pipeline", client) except FelderaAPIError as e: print(f"Error: {e.message}") print(f"Status code: {e.status_code}")

Complete Example

Here’s a complete example that ties everything together:

from feldera import FelderaClient, PipelineBuilder from feldera.enums import PipelineStatus import time # Initialize client client = FelderaClient() # Define SQL program sql = """ CREATE TABLE sensor_data ( sensor_id BIGINT NOT NULL, temperature DECIMAL(5, 2), timestamp TIMESTAMP ) WITH ('materialized' = 'true'); CREATE MATERIALIZED VIEW avg_temperature AS SELECT sensor_id, AVG(temperature) AS avg_temp FROM sensor_data GROUP BY sensor_id; """ # Create and start pipeline pipeline = PipelineBuilder(client, "sensor_pipeline", sql).create_or_replace() pipeline.start() pipeline.wait_for_status(PipelineStatus.RUNNING) # Push some data pipeline.input_json("sensor_data", [ {"sensor_id": 1, "temperature": 22.5, "timestamp": "2024-01-01T10:00:00"}, {"sensor_id": 2, "temperature": 23.1, "timestamp": "2024-01-01T10:00:00"}, {"sensor_id": 1, "temperature": 22.8, "timestamp": "2024-01-01T10:05:00"} ]) # Wait for processing time.sleep(2) # Query results for row in pipeline.query("SELECT * FROM avg_temperature ORDER BY sensor_id"): print(f"Sensor {row['sensor_id']}: {row['avg_temp']}°C") # Clean up pipeline.stop(force=True) pipeline.delete()

What’s Next

  • Cli Reference: Learn how to manage pipelines from the command line using the fda tool
  • Connectors Overview: Discover how to connect pipelines to external data sources and sinks
  • REST API Overview: Explore the full REST API for programmatic pipeline management and integration