Ingest data from PostgreSQL CDC
Change Data Capture (CDC) refers to the process of identifying and capturing data changes in a database, then delivering the changes to a downstream service in real time.
The supported PostgreSQL versions are 10, 11, 12, 13, and 14.
You can ingest CDC data from PostgreSQL in two ways:
Using the PostgreSQL CDC connector
This connector is included in RisingWave. With this connector, RisingWave can connect to PostgreSQL directly to obtain data from the binlog without starting additional services.
Using a CDC tool and the Kafka connector
You can use the Debezium connector for PostgreSQL and then use the Kafka connector in RisingWave to consume data from the Kafka topics.
Using the native PostgreSQL CDC connector
Set up PostgreSQL
- Self-hosted
- AWS RDS
Ensure that the
wal_level
of your PostgreSQL islogical
. Check by using the following statement.SHOW wal_level;
By default, it is
replica
. For CDC, you will need to set it to logical in the database configuration file (postgresql.conf
) or via apsql
command. The following command will change thewal_level
.ALTER SYSTEM SET wal_level = logical;
Keep in mind that changing the
wal_level
requires a restart of the PostgreSQL instance and can affect database performance.Assign
REPLICATION
,LOGIN
andCREATEDB
role attributes to the user.For an existing user, run the following statement to assign the attributes:
ALTER USER <username> REPLICATION LOGIN CREATEDB;
For a new user, run the following statement to create the user and assign the attributes:
CREATE USER <username> REPLICATION LOGIN CREATEDB;
You can check your role attributes by using the
\du
psql command:dev-# \du
List of roles
Role name | Attributes | Member of
-----------+-----------------------------------------------------------+---------
rw | Create DB, Replication | {}
postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {}Grant required priviledges to the user.
Run the following statements to grant the required priviledges to the user.
GRANT CONNECT ON DATABASE <database_name> TO <username>;
GRANT USAGE ON SCHEMA <schema_name> TO <username>;
GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>;You can use the following statement to check the priviledges of the user to the tables:
postgres=# SELECT table_name, grantee, privilege_type
FROM information_schema.role_table_grants
WHERE grantee='<username>';An example result:
table_name | grantee | privilege_type
-----------+---------+----------------
lineitem | rw | SELECT
customer | rw | SELECT
nation | rw | SELECT
orders | rw | SELECT
part | rw | SELECT
partsupp | rw | SELECT
supplier | rw | SELECT
region | rw | SELECT
(8 rows)
Here we will use a standard class instance without Multi-AZ deployment as an example.
- Check whether the
wal_level
parameter is set tological
. If it islogical
then we are done. Otherwise, create a parameter group for your Postgres instance. We created a parameter group named pg-cdc for the instance that is running Postgres 12. Next, click the pg-cdc parameter group to edit the value ofrds.logical_replication
to 1.

- Go to the Databases page and modify your instance to use the pg-cdc parameter group.

- Click Continue and choose Apply immediately. Finally, click Modify DB instance to save changes. Remember to reboot the Postgres instance to put the changes into effect.

Enable the connector node in RisingWave
The native PostgreSQL CDC connector is implemented by the connector node in RisingWave. The connector node handles the connections with upstream and downstream systems. You can use the docker-compose configuration of the latest RisingWave demo, in which the connector node is enabled by default. To learn about how to start RisingWave with this configuration, see Docker Compose.
Create a table using the native CDC connector
To ensure all data changes are captured, you must create a table and specify primary keys. See the CREATE TABLE
command for more details. The data format must be Debezium JSON.
Syntax
CREATE TABLE [ IF NOT EXISTS ] source_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='postgres-cdc',
<field>=<value>, ...
);
Note that a primary key is required.
WITH parameters
Unless specified otherwise, the fields listed are required.
Field | Notes |
---|---|
hostname | Hostname of the database. |
port | Port number of the database. |
username | Username of the database. |
password | Password of the database. |
database.name | Name of the database. |
schema.name | Optional. Name of the schema. By default, the value is public . |
table.name | Name of the table that you want to ingest data from. |
slot.name | Optional. The slot name for each PostgreSQL source. By default, each slot name will be randomly generated. Each source should have a unique slot name. |
Data format
Data is in Debezium JSON format. Debezium is a log-based CDC tool that can capture row changes from various database management systems such as PostgreSQL, MySQL, and SQL Server and generate events with consistent structures in real time. The PostgreSQL CDC connector in RisingWave supports JSON as the serialization format for Debezium data. The data format does not need to be specified when creating a table with postgres-cdc
as the source.
Example
CREATE TABLE shipments (
shipment_id integer,
order_id integer,
origin string,
destination string,
is_arrived boolean,
PRIMARY KEY (shipment_id)
) WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'postgres',
password = 'postgres',
database.name = 'dev',
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
);
Use the Debezium connector for PostgreSQL
Set up PostgreSQL
Before using the native PostgreSQL CDC connector in RisingWave, you need to complete several configurations for PostgreSQL. For details, see Set up PostgreSQL. There are instructions on how to set up the self-hosted PostgreSQL and AWS RDS.
Deploy the Debezium connector for PostgreSQL
You need to download and configure the Debezium connector for PostgreSQL, and then add the configuration to your Kafka Connect cluster. For details, see the Deployment section.
Create a table using the Kafka connector
To ensure all data changes are captured, you must create a table and specify primary keys. See the CREATE TABLE
command for more details. The data format must be Debezium JSON.
CREATE TABLE source_name (
column1 varchar,
column2 integer,
PRIMARY KEY (column1)
)
WITH (
connector='kafka',
topic='user_test_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='earliest',
properties.group.id='demo_consumer_name'
)
ROW FORMAT DEBEZIUM_JSON;