Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request feature: sink type = scd type2 #19924

Open
viethqb opened this issue Dec 24, 2024 · 3 comments
Open

Request feature: sink type = scd type2 #19924

viethqb opened this issue Dec 24, 2024 · 3 comments

Comments

@viethqb
Copy link

viethqb commented Dec 24, 2024

Is your feature request related to a problem? Please describe.

In data analysis scd is an important concept that has applications in many cases. Currently risingwave supports two main sink types: append and upsert. I think sink type = scd type 2 will be a superior feature compared to other applications.
for example mysql cdc => rw => iceberg/doris/starrocks/bigquery (scd type2) => this is awesome.

Describe the solution you'd like

I think the Solution could be: source table must have ID column. sink table has additional columns __rw_valid_flag, __rw_valid_from, __rw_valid_to.

Describe alternatives you've considered

No response

Additional context

No response

@github-actions github-actions bot added this to the release-2.2 milestone Dec 24, 2024
@xxchan
Copy link
Member

xxchan commented Dec 26, 2024

cc @neverchanje @st1page

@xxchan
Copy link
Member

xxchan commented Dec 26, 2024

I came up with a solution that can work now:

-- your original cdc table
create table t(pk int primary key, v int);

-- transform the cdc table to stream changelog 
-- and add a timestamp column
create table t_changelog(pk int, v int, inserted_at timestamptz as proctime());
create sink s into t_changelog as with c as changelog from t select pk, v from c where changelog_op = 1 or changelog_op = 3 with (type = 'append-only');

-- final scd type 2 sink to your system
CREATE SINK s_scd2 AS 
select *, end_at is null as is_valid from (
    select *, lag(inserted_at) over (partition by pk order by inserted_at desc) as end_at 
    from t_changelog
)
WITH (...);

with sample data:

dev=> insert into t values (1,2);
INSERT 0 1
dev=> insert into t values (1,3);
INSERT 0 1
dev=> insert into t values (1,4);
INSERT 0 1
dev=> insert into t values (2,1);
INSERT 0 1
dev=> select *, end_at is null as is_valid from (
    select *, lag(inserted_at) over (partition by pk order by inserted_at desc) as end_at 
    from t_changelog
);
┌────┬───┬───────────────────────────────┬───────────────────────────────┬──────────┐
│ pk │ v │          inserted_at          │           end_at              │ is_valid │
├────┼───┼───────────────────────────────┼───────────────────────────────┼──────────┤
│  142024-12-26 03:14:55.993 00:00 │ ∅                             │ t        │
│  132024-12-26 03:14:54.244 00:002024-12-26 03:14:55.993 00:00 │ f        │
│  122024-12-26 03:14:51.744 00:002024-12-26 03:14:54.244 00:00 │ f        │
│  212024-12-26 03:14:59.244 00:00 │ ∅                             │ t        │
└────┴───┴───────────────────────────────┴───────────────────────────────┴──────────┘
(4 rows)

@viethqb
Copy link
Author

viethqb commented Dec 26, 2024

@xxchan Thank you so much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants