Skip to main content

CDC

Use the SQL statement below to connect RisingWave to a CDC source.

note

Currently, RisingWave only supports materialized CDC sources with primary keys, and the data format must be Debezium JSON.

Syntax

CREATE MATERIALIZED SOURCE [ IF NOT EXISTS ] source_name (
column_name data_type [ PRIMARY KEY ], ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='kafka',
field_name='value', ...
)
ROW FORMAT DEBEZIUM JSON;

WITH options

FieldDefaultTypeDescriptionRequired?
kafka.topicNoneStringAddress of the Kafka topic. One source can only correspond to one topic.True
kafka.brokersNoneStringAddress of the Kafka broker. Format: 'ip:port,ip:port'.True
kafka.scan.startup.modeearliestStringThe Kafka consumer starts consuming data from the commit offset. This includes two values: 'earliest' and 'latest'.False
kafka.time.offsetNoneInt64Specify the offset in seconds from a certain point of time.False
kafka.consumer.groupNoneStringName of the Kafka consumer groupTrue

Example

Here is an example of connecting RisingWave to a CDC service to read data from individual streams.

CREATE MATERIALIZED SOURCE [IF NOT EXISTS] source_name (
column1 varchar,
column2 integer,
PRIMARY KEY (column1)
)
WITH (
connector='kafka',
kafka.topic='user_test_topic',
kafka.brokers='172.10.1.1:9090,172.10.1.2:9090',
kafka.scan.startup.mode='earliest',
kafka.consumer.group='demo_consumer_name'
)
ROW FORMAT DEBEZIUM JSON;

Help us make this doc better!