Skip to main content

Ingest data from Google Pub/Sub

Use the SQL statement below to connect RisingWave to Google Pub/Sub source.

Syntax

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name 
[ schema_definition ]
WITH (
connector='google_pubsub',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
message = 'message',
schema.location = 'location' | schema.registry = 'schema_registry_url'
);

Parameters

FieldNote
pubsub.subscriptionRequired. Specifies the Pub/Sub subscription to consume messages from. Ensure the subscription is configured with the retain-on-ack property to enable message replay.
pubsub.credentialsRequired. A JSON string containing the service account credentials for authorization, see the service-account credentials guide. The provided account credential must have the pubsub.subscriber role.
pubsub.start_offset.nanosOptional. Cannot be set together with pubsub.start_snapshot. Specifies a numeric timestamp in nanoseconds, ideally the publish timestamp of a message in the subscription. If present, the connector seeks the subscription to the timestamp and starts consuming from there. Note that the seek operation is subject to limitations based on the message retention policy of the subscription.
pubsub.start_snapshotOptional. Cannot be set together with pubsub.start_offset.nanos. If present, the connector first seeks to the specified snapshot before starting consumption.
note

The Pub/Sub topic provided must have retain_acked_messages enabled and must define a retention policy. For details, see Configure subscription message retention.

info

We can only achieve at-least-once semantic for the Pub/Sub source rather than exactly once because the SDK cannot seek back to a specific message offset.

Here's how the recovery process works:

  1. Record the timestamp for each message as an offset.

  2. During recovery, seek back to the specific timestamp and consume messages again from the topic.

If two messages are produced almost at the same time, it is possible to receive each message more than once.

Examples

CREATE TABLE s1 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
) FORMAT PLAIN ENCODE JSON;

Help us make this doc better!

Was this page helpful?

Happy React is loading...