Internal site. Jolli authentication required to view.
Skip to Content
βš™οΈ OperationsMetrics Monitoring

Last Updated: 3/19/2026


Metrics & Monitoring

Feldera pipelines expose comprehensive metrics for monitoring performance, throughput, and resource usage. These metrics are available via the REST API, Python SDK, and Prometheus endpoints.

Pipeline Statistics

Get pipeline statistics using the Python SDK:

stats = pipeline.stats() # Global metrics print(f"Total input records: {stats.global_metrics.total_input_records}") print(f"Total processed records: {stats.global_metrics.total_processed_records}") print(f"Pipeline complete: {stats.global_metrics.pipeline_complete}")

Global Metrics

Throughput:

  • total_input_records β€” Total records received from all input connectors
  • total_processed_records β€” Total records processed by the pipeline
  • pipeline_complete β€” Whether all inputs have been fully processed

Performance:

  • buffered_input_records β€” Records buffered but not yet processed
  • total_input_bytes β€” Total bytes received from inputs
  • total_processed_bytes β€” Total bytes processed

State:

  • state β€” Current pipeline state (running, paused, etc.)
  • current_step β€” Current processing step number

Transactions:

  • transaction_status β€” Current transaction status
  • transaction_id β€” ID of active transaction (0 if none)

Connector Statistics

Monitor individual input and output connectors:

# Input connector stats input_stats = pipeline.input_connector_stats("users", "users_connector") print(f"Records received: {input_stats.total_records}") print(f"Parse errors: {input_stats.num_parse_errors}") # Output connector stats output_stats = pipeline.output_connector_stats("results", "results_connector") print(f"Records transmitted: {output_stats.transmitted_records}")

Input Connector Metrics

  • total_records β€” Total records received
  • total_bytes β€” Total bytes received
  • num_parse_errors β€” Number of parsing errors
  • end_of_input β€” Whether connector has finished
  • buffered_records β€” Records buffered but not processed

Output Connector Metrics

  • transmitted_records β€” Records sent to output
  • transmitted_bytes β€” Bytes sent to output
  • buffered_records β€” Records buffered for output
  • num_encode_errors β€” Number of encoding errors

Prometheus Metrics

Feldera exposes metrics in Prometheus format at the /metrics endpoint:

curl http://localhost:8080/metrics

Key Prometheus Metrics

Pipeline throughput:

  • feldera_input_records_total β€” Total input records by connector
  • feldera_processed_records_total β€” Total processed records
  • feldera_output_records_total β€” Total output records by connector

Performance:

  • feldera_step_duration_seconds β€” Time spent in each processing step
  • feldera_buffered_records β€” Number of buffered records

Errors:

  • feldera_parse_errors_total β€” Total parse errors by connector
  • feldera_encode_errors_total β€” Total encode errors by connector

Resource usage:

  • feldera_memory_bytes β€” Memory usage
  • feldera_cpu_seconds_total β€” CPU time used

Monitoring Patterns

Checking Pipeline Health

def check_pipeline_health(pipeline): stats = pipeline.stats() # Check if pipeline is processing if stats.global_metrics.total_input_records == 0: print("WARNING: No input records received") # Check for backlog buffered = stats.global_metrics.buffered_input_records if buffered > 1000000: print(f"WARNING: Large backlog: {buffered} records") # Check for parse errors for name, connector in stats.inputs.items(): if connector.num_parse_errors > 0: print(f"ERROR: {connector.num_parse_errors} parse errors in {name}")

Monitoring Throughput

import time def monitor_throughput(pipeline, interval=5): prev_processed = 0 while True: stats = pipeline.stats() processed = stats.global_metrics.total_processed_records rate = (processed - prev_processed) / interval print(f"Throughput: {rate:.0f} records/sec") prev_processed = processed time.sleep(interval)

Detecting Completion

def wait_for_completion(pipeline): while True: if pipeline.is_complete(): print("Pipeline completed processing all inputs") break time.sleep(1)

Grafana Dashboards

Create Grafana dashboards using Prometheus metrics:

Throughput panel:

rate(feldera_input_records_total[1m])

Latency panel:

rate(feldera_step_duration_seconds[1m])

Error rate panel:

rate(feldera_parse_errors_total[1m])

Logging

Access pipeline logs:

for log_line in pipeline.logs(): print(log_line)

Configure log filtering in runtime config:

{ "logging": "info,feldera=debug" }

Performance Profiling

Enable CPU profiling:

{ "cpu_profiler": true }

Generate a support bundle with profiling data:

bundle = pipeline.support_bundle( circuit_profile=True, heap_profile=True, metrics=True, logs=True )

What’s Next