Skip to content

Instantly share code, notes, and snippets.

@IL-ADAM
Created September 19, 2023 13:06
Show Gist options
  • Save IL-ADAM/2671c125a743de32054640c271d2dcf1 to your computer and use it in GitHub Desktop.
Save IL-ADAM/2671c125a743de32054640c271d2dcf1 to your computer and use it in GitHub Desktop.
Add user merged stream
CREATE STREAM IF NOT EXISTS user_merged_stream WITH (
KAFKA_TOPIC = '{{ cookiecutter.user_merged_topic }}',
VALUE_FORMAT = 'JSON'
)
AS
SELECT
REPLACE(`user_id`, '"', '') as KEY,
AS_VALUE(REPLACE(`user_id`, '"', '')) as `user_id`,
`first_name` STRING,
`last_name` STRING,
`app_id` STRING
FROM user_OUT_STREAM
PARTITION BY REPLACE(`user_id`, '"', '');
SET 'auto.offset.reset' = 'earliest';
INSERT INTO user_merge_stream
WITH (query_id='USER_IN_INTO_USER_MERGED')
SELECT
`user_id` as KEY,
AS_VALUE(`user_id`) as `user_id`,
`first_name` STRING,
`last_name` STRING,
`app_id` STRING
FROM USER_IN_STREAM
PARTITION BY `user_id`;
SET 'auto.offset.reset' = 'earliest';
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment