Last Updated: 3/19/2026
Metrics & Monitoring
Feldera pipelines expose comprehensive metrics for monitoring performance, throughput, and resource usage. These metrics are available via the REST API, Python SDK, and Prometheus endpoints.
Pipeline Statistics
Get pipeline statistics using the Python SDK:
stats = pipeline.stats()
# Global metrics
print(f"Total input records: {stats.global_metrics.total_input_records}")
print(f"Total processed records: {stats.global_metrics.total_processed_records}")
print(f"Pipeline complete: {stats.global_metrics.pipeline_complete}")Global Metrics
Throughput:
total_input_recordsβ Total records received from all input connectorstotal_processed_recordsβ Total records processed by the pipelinepipeline_completeβ Whether all inputs have been fully processed
Performance:
buffered_input_recordsβ Records buffered but not yet processedtotal_input_bytesβ Total bytes received from inputstotal_processed_bytesβ Total bytes processed
State:
stateβ Current pipeline state (running, paused, etc.)current_stepβ Current processing step number
Transactions:
transaction_statusβ Current transaction statustransaction_idβ ID of active transaction (0 if none)
Connector Statistics
Monitor individual input and output connectors:
# Input connector stats
input_stats = pipeline.input_connector_stats("users", "users_connector")
print(f"Records received: {input_stats.total_records}")
print(f"Parse errors: {input_stats.num_parse_errors}")
# Output connector stats
output_stats = pipeline.output_connector_stats("results", "results_connector")
print(f"Records transmitted: {output_stats.transmitted_records}")Input Connector Metrics
total_recordsβ Total records receivedtotal_bytesβ Total bytes receivednum_parse_errorsβ Number of parsing errorsend_of_inputβ Whether connector has finishedbuffered_recordsβ Records buffered but not processed
Output Connector Metrics
transmitted_recordsβ Records sent to outputtransmitted_bytesβ Bytes sent to outputbuffered_recordsβ Records buffered for outputnum_encode_errorsβ Number of encoding errors
Prometheus Metrics
Feldera exposes metrics in Prometheus format at the /metrics endpoint:
curl http://localhost:8080/metricsKey Prometheus Metrics
Pipeline throughput:
feldera_input_records_totalβ Total input records by connectorfeldera_processed_records_totalβ Total processed recordsfeldera_output_records_totalβ Total output records by connector
Performance:
feldera_step_duration_secondsβ Time spent in each processing stepfeldera_buffered_recordsβ Number of buffered records
Errors:
feldera_parse_errors_totalβ Total parse errors by connectorfeldera_encode_errors_totalβ Total encode errors by connector
Resource usage:
feldera_memory_bytesβ Memory usagefeldera_cpu_seconds_totalβ CPU time used
Monitoring Patterns
Checking Pipeline Health
def check_pipeline_health(pipeline):
stats = pipeline.stats()
# Check if pipeline is processing
if stats.global_metrics.total_input_records == 0:
print("WARNING: No input records received")
# Check for backlog
buffered = stats.global_metrics.buffered_input_records
if buffered > 1000000:
print(f"WARNING: Large backlog: {buffered} records")
# Check for parse errors
for name, connector in stats.inputs.items():
if connector.num_parse_errors > 0:
print(f"ERROR: {connector.num_parse_errors} parse errors in {name}")Monitoring Throughput
import time
def monitor_throughput(pipeline, interval=5):
prev_processed = 0
while True:
stats = pipeline.stats()
processed = stats.global_metrics.total_processed_records
rate = (processed - prev_processed) / interval
print(f"Throughput: {rate:.0f} records/sec")
prev_processed = processed
time.sleep(interval)Detecting Completion
def wait_for_completion(pipeline):
while True:
if pipeline.is_complete():
print("Pipeline completed processing all inputs")
break
time.sleep(1)Grafana Dashboards
Create Grafana dashboards using Prometheus metrics:
Throughput panel:
rate(feldera_input_records_total[1m])Latency panel:
rate(feldera_step_duration_seconds[1m])Error rate panel:
rate(feldera_parse_errors_total[1m])Logging
Access pipeline logs:
for log_line in pipeline.logs():
print(log_line)Configure log filtering in runtime config:
{
"logging": "info,feldera=debug"
}Performance Profiling
Enable CPU profiling:
{
"cpu_profiler": true
}Generate a support bundle with profiling data:
bundle = pipeline.support_bundle(
circuit_profile=True,
heap_profile=True,
metrics=True,
logs=True
)Whatβs Next
- Pipeline Configuration: Configure profiling, logging, and other pipeline settings.
- Memory Management: Monitor and optimize memory usage in your pipelines.
- Pipeline Lifecycle: Understand pipeline states and lifecycle transitions.