Skip to main content

Ingest data from S3 buckets

Use the SQL statement below to connect RisingWave to an Amazon S3 source. RisingWave supports both CSV and ndjson file formats.

The S3 connector does not guarantee the sequential reading of files or complete file reading.

Syntax

CREATE SOURCE [ IF NOT EXISTS ] source_name 
schema_definition
[INCLUDE { header | key | offset | partition | timestamp } [AS <column_name>]]
WITH (
connector={ 's3' | 's3_v2' },
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);
info

For CSV data, specify the delimiter in the delimiter option in ENCODE properties.

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)

Parameters

FieldNotes
connectorRequired. Select between the s3 and s3_v2 (recommended) connector. Learn more about s3_v2.
s3.region_nameRequired. The service region.
s3.bucket_nameRequired. The name of the bucket the data source is stored in.
s3.credentials.accessRequired. This field indicates the access key ID of AWS.
s3.credentials.secretRequired. This field indicates the secret access key of AWS.
match_patternConditional. This field is used to find object keys in s3.bucket_name that match the given pattern. Standard Unix-style glob syntax is supported.
s3.endpoint_urlConditional. The host URL for an S3-compatible object storage server. This allows users to use a different server instead of the standard S3 server.
note

Empty cells in CSV files will be parsed to NULL.

FieldNotes
data_formatSupported data format: PLAIN.
data_encodeSupported data encodes: CSV, JSON.
without_headerWhether the first line is header. Accepted values: 'true', 'false'. Default: 'true'.
delimiterHow RisingWave splits contents. For JSON encode, the delimiter is \n.

s3_v2 connector

BETA FEATURE

The s3_v2 connector is currently in Beta. Please contact us if you encounter any issues or have feedback.

The s3 connector treats files as splits, resulting in poor scalability and potential timeouts when dealing with a large number of files.

The s3_v2 connector is designed to address the scalability and performance limitations of the s3 connector by implementing a more efficient listing and fetching mechanism. If you want to explore the technical details of this new approach, refer to the design document.

Example

Here are examples of connecting RisingWave to an S3 source to read data from individual streams.

CREATE TABLE s(
id int,
name varchar,
age int
)
WITH (
connector = 's3_v2',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ',' -- set delimiter = E'\t' for tab-separated files
);

Important considerations

Object filtering in S3 buckets

RisingWave has a prefix argument designed for filtering objects in the S3 bucket. It relies on Apache Opendal whose prefix filter implementation is expected to be released soon.

Source file name as column

A feature to create a column with the source file name is currently under development. You can track the progress here.

Handling new files in the bucket

RisingWave automatically ingests new files added to the bucket. However, it does not detect updates to a file if a file is deleted and a new file with the same name is added simultaneously. Additionally, RisingWave will ignore file deletions.

Reading data from the source

You need to create a materialized view from the source or create a table with the S3 connector to read the data. Here are some examples:

-- Create a materialized view from the source
CREATE SOURCE s3_source WITH ( connector = 's3_v2', ... );
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;

-- Create a table with the S3 connector
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3_v2', ... );

Handling unexpected file types or poorly formatted files

RisingWave will attempt to interpret and parse files, regardless of their type, as CSV or ndjson, based on the specified rules. Warnings will be reported for parts of the file that cannot be parsed, but the source part will not fail. Poorly formatted parts of a file will be discarded.

Help us make this doc better!

Was this page helpful?

Happy React is loading...