Last Updated: 3/19/2026
Snowflake and Redis Sinks
Feldera supports writing pipeline output to Snowflake data warehouses and Redis key-value stores. These connectors enable integration with downstream analytics and caching systems.
Redis Output Connector
The Redis output connector writes pipeline output to Redis as key-value pairs or streams.
Basic Configuration
outputs:
cache:
stream: user_stats
connector_config:
transport:
name: redis_output
config:
connection_string: "redis://localhost:6379"
format:
name: jsonConfiguration Options
Connection:
connection_stringโ Redis connection URL (e.g.,"redis://localhost:6379","rediss://user:pass@host:6380")
Authentication:
- Include credentials in the connection string:
"redis://username:password@host:port" - Use TLS with
rediss://scheme
Key-Value Mode
Write output as key-value pairs using an index:
CREATE TABLE user_stats (
user_id INTEGER PRIMARY KEY,
event_count INTEGER,
last_event TIMESTAMP
);
CREATE INDEX idx_user_stats ON user_stats(user_id);outputs:
cache:
stream: user_stats
index: idx_user_stats
connector_config:
transport:
name: redis_output
config:
connection_string: "redis://localhost:6379"
format:
name: jsonThe connector uses the index key as the Redis key and the full record as the value.
Stream Mode
Write output as a Redis stream (without an index):
outputs:
events:
stream: processed_events
connector_config:
transport:
name: redis_output
config:
connection_string: "redis://localhost:6379"
format:
name: jsonUse Cases
Caching query results:
CREATE MATERIALIZED VIEW top_users AS
SELECT user_id, COUNT(*) as count
FROM events
GROUP BY user_id
ORDER BY count DESC
LIMIT 100;Real-time leaderboards:
CREATE MATERIALIZED VIEW leaderboard AS
SELECT
user_id,
score,
RANK() OVER (ORDER BY score DESC) as rank
FROM user_scores
ORDER BY rank
LIMIT 10;Snowflake Output Connector
The Snowflake connector writes pipeline output to Snowflake tables via Kafka and Snowpipe.
Architecture
Feldera โ Kafka โ Snowpipe โ SnowflakeConfiguration
Write to Kafka with Snowflake-compatible format:
outputs:
warehouse:
stream: aggregated_results
connector_config:
transport:
name: kafka_output
config:
bootstrap.servers: "kafka:9092"
topic: "snowflake-ingestion"
format:
name: json
config:
update_format: "snowflake"Snowpipe Setup
Configure Snowpipe to ingest from the Kafka topic:
CREATE PIPE my_pipe AS
COPY INTO my_table
FROM @my_stage
FILE_FORMAT = (TYPE = JSON);Data Format
The Snowflake format produces JSON records compatible with Snowpipe:
{"user_id": 1, "count": 100, "timestamp": "2024-01-15T10:00:00Z"}
{"user_id": 2, "count": 200, "timestamp": "2024-01-15T10:01:00Z"}Use Cases
Data warehouse loading:
CREATE MATERIALIZED VIEW daily_summary AS
SELECT
DATE_TRUNC('day', event_time) as day,
user_id,
COUNT(*) as event_count,
SUM(amount) as total_amount
FROM events
GROUP BY day, user_id;Real-time reporting:
CREATE MATERIALIZED VIEW hourly_metrics AS
SELECT
DATE_TRUNC('hour', event_time) as hour,
metric_name,
AVG(value) as avg_value,
MAX(value) as max_value
FROM metrics
GROUP BY hour, metric_name;Whatโs Next
- Kafka Connector: Learn about Kafka output configuration
- Connector Orchestration: Control connector startup order
- Materialized Views: Define views for output