Last Updated: 3/19/2026
HTTP Connector
The HTTP connector enables Feldera pipelines to receive data via HTTP POST requests (input) and stream data to HTTP clients (output). This connector is primarily used for testing, debugging, and browser-based applications.
HTTP Input
The HTTP input connector allows you to push data to a pipeline via HTTP POST requests. This connector is instantiated automatically when you use the pipeline API to push data.
Pushing Data via API
Push JSON data to a table:
pipeline.input_json(
table_name="users",
data={"id": 1, "name": "Alice", "age": 30},
update_format="raw"
)Push an array of records:
pipeline.input_json(
table_name="users",
data=[
{"id": 1, "name": "Alice", "age": 30},
{"id": 2, "name": "Bob", "age": 25}
],
update_format="raw"
)Update Formats
The HTTP input connector supports two update formats:
Raw format β Each record is an insert:
{"id": 1, "name": "Alice", "age": 30}Insert/Delete format β Explicitly specify inserts and deletes:
{
"insert": {"id": 1, "name": "Alice", "age": 30}
}{
"delete": {"id": 1, "name": "Alice", "age": 30}
}Forcing Input on Paused Pipelines
By default, pushing data to a paused pipeline raises an error. Use the force flag to push data anyway:
pipeline.input_json(
table_name="users",
data={"id": 1, "name": "Alice"},
force=True
)Pushing Pandas DataFrames
Push a pandas DataFrame to a table:
import pandas as pd
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"age": [30, 25, 35]
})
pipeline.input_pandas(table_name="users", df=df)The DataFrame is automatically chunked and sent to the pipeline in batches.
HTTP Input Configuration
The HTTP input connector is created automatically and uses the following default configuration:
- Format β CSV (default) or JSON
- Max buffered records β 100,000 records
- Backpressure β Enabled by default
When the pipeline is paused, HTTP input requests block until the pipeline resumes (unless force=True is specified).
Fault Tolerance
The HTTP input connector supports exactly-once fault tolerance. When fault tolerance is enabled, the connector stores received data in the write-ahead log, ensuring no data is lost on restart.
HTTP Output
The HTTP output connector streams pipeline output to HTTP clients as a response to a GET request. This is useful for browser-based applications and testing.
Subscribing to Output
Subscribe to a viewβs output stream:
handler = pipeline.listen(view_name="user_stats")
# Process chunks as they arrive
for chunk in handler:
print(chunk)The handler receives all changes from the point when the listener is created. To receive changes from the beginning, start the pipeline in paused mode, attach listeners, then resume:
pipeline.start_paused()
handler = pipeline.listen(view_name="user_stats")
pipeline.resume()Processing Chunks with Callbacks
Run a callback on each output chunk:
def process_chunk(df, seq_no):
print(f"Received chunk {seq_no} with {len(df)} rows")
print(df.head())
pipeline.foreach_chunk(view_name="user_stats", callback=process_chunk)The callback receives:
dfβ A pandas DataFrame containing the chunkseq_noβ A monotonically increasing sequence number
The callback runs in a separate thread and should be thread-safe.
Output Format
HTTP output is streamed as JSON chunks:
{"sequence_number": 0, "text_data": "id,name,count\n1,Alice,10\n"}
{"sequence_number": 1, "text_data": "2,Bob,5\n"}Each chunk contains:
sequence_numberβ A monotonically increasing integertext_dataβ The output data in the configured format (CSV or JSON)
Empty chunks (with no text_data field) are sent periodically as keepalives.
Backpressure
The HTTP output connector supports optional backpressure:
- Backpressure enabled β The pipeline waits for the client to receive data before producing more output
- Backpressure disabled β The connector buffers up to 100 chunks and drops data if the client is too slow
Backpressure is enabled by default for HTTP output.
Fault Tolerance
The HTTP output connector does not support fault tolerance. Output streams are ephemeral and do not survive pipeline restarts.
Use Cases
Testing and Debugging
Use HTTP input to manually push test data:
# Start pipeline in paused mode
pipeline.start_paused()
# Push test data
pipeline.input_json("users", {"id": 1, "name": "Alice"})
pipeline.input_json("users", {"id": 2, "name": "Bob"})
# Resume and check output
pipeline.resume()
results = list(pipeline.query("SELECT * FROM user_stats"))Browser-Based Applications
Stream real-time updates to a web browser:
const response = await fetch(
`http://localhost:8080/v0/pipelines/${pipelineName}/egress/${viewName}?format=json`
);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.trim()) {
const data = JSON.parse(line);
console.log('Received:', data);
}
}
}Interactive Data Exploration
Push data interactively and inspect results:
# Push some data
pipeline.input_json("events", {"user_id": 1, "action": "login"})
# Query the results
for row in pipeline.query("SELECT * FROM event_summary"):
print(row)Limitations
The HTTP connector has several limitations:
- No authentication β HTTP endpoints are not authenticated (use API keys for the REST API)
- No reliable delivery β Output streams do not guarantee delivery; clients must handle reconnection
- No output fault tolerance β Output streams are ephemeral and do not survive restarts
- Limited throughput β HTTP input is not optimized for high-throughput ingestion (use Kafka for production)
For production workloads, use dedicated connectors like Kafka, S3, or database CDC connectors instead of HTTP.
Whatβs Next
- Kafka Connector: Learn about the Kafka input and output connectors
- REST API Overview: Understand the full REST API for pipeline management
- Connector Orchestration: Control connector startup order and dependencies