Internal site. Jolli authentication required to view.
Skip to Content
πŸ”Œ ConnectorsHttp Connector

Last Updated: 3/19/2026


HTTP Connector

The HTTP connector enables Feldera pipelines to receive data via HTTP POST requests (input) and stream data to HTTP clients (output). This connector is primarily used for testing, debugging, and browser-based applications.

HTTP Input

The HTTP input connector allows you to push data to a pipeline via HTTP POST requests. This connector is instantiated automatically when you use the pipeline API to push data.

Pushing Data via API

Push JSON data to a table:

pipeline.input_json( table_name="users", data={"id": 1, "name": "Alice", "age": 30}, update_format="raw" )

Push an array of records:

pipeline.input_json( table_name="users", data=[ {"id": 1, "name": "Alice", "age": 30}, {"id": 2, "name": "Bob", "age": 25} ], update_format="raw" )

Update Formats

The HTTP input connector supports two update formats:

Raw format β€” Each record is an insert:

{"id": 1, "name": "Alice", "age": 30}

Insert/Delete format β€” Explicitly specify inserts and deletes:

{ "insert": {"id": 1, "name": "Alice", "age": 30} }
{ "delete": {"id": 1, "name": "Alice", "age": 30} }

Forcing Input on Paused Pipelines

By default, pushing data to a paused pipeline raises an error. Use the force flag to push data anyway:

pipeline.input_json( table_name="users", data={"id": 1, "name": "Alice"}, force=True )

Pushing Pandas DataFrames

Push a pandas DataFrame to a table:

import pandas as pd df = pd.DataFrame({ "id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"], "age": [30, 25, 35] }) pipeline.input_pandas(table_name="users", df=df)

The DataFrame is automatically chunked and sent to the pipeline in batches.

HTTP Input Configuration

The HTTP input connector is created automatically and uses the following default configuration:

  • Format β€” CSV (default) or JSON
  • Max buffered records β€” 100,000 records
  • Backpressure β€” Enabled by default

When the pipeline is paused, HTTP input requests block until the pipeline resumes (unless force=True is specified).

Fault Tolerance

The HTTP input connector supports exactly-once fault tolerance. When fault tolerance is enabled, the connector stores received data in the write-ahead log, ensuring no data is lost on restart.

HTTP Output

The HTTP output connector streams pipeline output to HTTP clients as a response to a GET request. This is useful for browser-based applications and testing.

Subscribing to Output

Subscribe to a view’s output stream:

handler = pipeline.listen(view_name="user_stats") # Process chunks as they arrive for chunk in handler: print(chunk)

The handler receives all changes from the point when the listener is created. To receive changes from the beginning, start the pipeline in paused mode, attach listeners, then resume:

pipeline.start_paused() handler = pipeline.listen(view_name="user_stats") pipeline.resume()

Processing Chunks with Callbacks

Run a callback on each output chunk:

def process_chunk(df, seq_no): print(f"Received chunk {seq_no} with {len(df)} rows") print(df.head()) pipeline.foreach_chunk(view_name="user_stats", callback=process_chunk)

The callback receives:

  • df β€” A pandas DataFrame containing the chunk
  • seq_no β€” A monotonically increasing sequence number

The callback runs in a separate thread and should be thread-safe.

Output Format

HTTP output is streamed as JSON chunks:

{"sequence_number": 0, "text_data": "id,name,count\n1,Alice,10\n"} {"sequence_number": 1, "text_data": "2,Bob,5\n"}

Each chunk contains:

  • sequence_number β€” A monotonically increasing integer
  • text_data β€” The output data in the configured format (CSV or JSON)

Empty chunks (with no text_data field) are sent periodically as keepalives.

Backpressure

The HTTP output connector supports optional backpressure:

  • Backpressure enabled β€” The pipeline waits for the client to receive data before producing more output
  • Backpressure disabled β€” The connector buffers up to 100 chunks and drops data if the client is too slow

Backpressure is enabled by default for HTTP output.

Fault Tolerance

The HTTP output connector does not support fault tolerance. Output streams are ephemeral and do not survive pipeline restarts.

Use Cases

Testing and Debugging

Use HTTP input to manually push test data:

# Start pipeline in paused mode pipeline.start_paused() # Push test data pipeline.input_json("users", {"id": 1, "name": "Alice"}) pipeline.input_json("users", {"id": 2, "name": "Bob"}) # Resume and check output pipeline.resume() results = list(pipeline.query("SELECT * FROM user_stats"))

Browser-Based Applications

Stream real-time updates to a web browser:

const response = await fetch( `http://localhost:8080/v0/pipelines/${pipelineName}/egress/${viewName}?format=json` ); const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split('\n'); for (const line of lines) { if (line.trim()) { const data = JSON.parse(line); console.log('Received:', data); } } }

Interactive Data Exploration

Push data interactively and inspect results:

# Push some data pipeline.input_json("events", {"user_id": 1, "action": "login"}) # Query the results for row in pipeline.query("SELECT * FROM event_summary"): print(row)

Limitations

The HTTP connector has several limitations:

  1. No authentication β€” HTTP endpoints are not authenticated (use API keys for the REST API)
  2. No reliable delivery β€” Output streams do not guarantee delivery; clients must handle reconnection
  3. No output fault tolerance β€” Output streams are ephemeral and do not survive restarts
  4. Limited throughput β€” HTTP input is not optimized for high-throughput ingestion (use Kafka for production)

For production workloads, use dedicated connectors like Kafka, S3, or database CDC connectors instead of HTTP.

What’s Next