Skip to main content

Kafka & Redpanda

Use the SQL statement below to connect RisingWave to a Kafka/Redpanda broker.

Syntax

CREATE [ MATERIALIZED ] SOURCE [ IF NOT EXISTS ] source_name (
column_name data_type, ...
)
WITH (
connector='kafka',
field_name='value', ...
)
ROW FORMAT JSON | PROTOBUF MESSAGE 'main_message'
[ ROW SCHEMA LOCATION 's3://path' ];

WITH options

FieldDefaultTypeDescriptionRequired?
kafka.topicNoneStringAddress of the Kafka topic. One source can only correspond to one topic.True
kafka.brokersNoneStringAddress of the Kafka broker. Format: 'ip:port,ip:port'.True
kafka.scan.startup.modeearliestStringThe Kafka consumer starts consuming data from the commit offset. This includes two values: 'earliest' and 'latest'.False
kafka.time.offsetNoneInt64Specify the offset in seconds from a certain point of time.False
kafka.consumer.groupNoneStringName of the Kafka consumer groupTrue

Example

Here is an example of connecting RisingWave to a Kafka broker to read data from individual topics.

CREATE MATERIALIZED SOURCE IF NOT EXISTS source_abc (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
kafka.topic='demo_topic',
kafka.brokers='172.10.1.1:9090,172.10.1.2:9090',
kafka.scan.startup.mode='latest',
kafka.time.offset='140000000',
kafka.consumer.group='demo_consumer_name'
)
ROW FORMAT JSON;

Help us make this doc better!