Last active
August 16, 2018 13:57
-
-
Save miguno/e6048db8b5a55b334947a848c63ca884 to your computer and use it in GitHub Desktop.
Using KSQL to join data sources with different formats, e.g. a stream with JSON and a table with Avro data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE STREAM pageviews (viewtime BIGINT, user_id VARCHAR, ...) WITH (KAFKA_TOPIC='pageviews-topic', VALUE_FORMAT='AVRO'); | |
CREATE TABLE users (user_id VARCHAR, registertime BIGINT, ...) WITH (KAFKA_TOPIC='users-topic', KEY='user_id', VALUE_FORMAT='JSON'); | |
CREATE STREAM pageviews_enriched AS | |
SELECT pv.viewtime, pv.userid AS userid, ... FROM pageviews pv | |
LEFT JOIN users ON users.user_id = pv.user_id; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ksql> CREATE STREAM PLANSTREAM WITH (KAFKA_TOPIC='NBC_APPS.TBL_MS_PLAN_WK',VALUE_FORMAT='AVRO');
ksql> describe PLANSTREAM;
Name : PLANSTREAM
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TABLE1 | VARCHAR(STRING)
OP_TYPE | VARCHAR(STRING)
OP_TS | VARCHAR(STRING)
CURRENT_TS | VARCHAR(STRING)
POS | VARCHAR(STRING)
PLAN_ID | BIGINT
PLAN_NAME | VARCHAR(STRING)
ORIGINAL_PLAN_ID | BIGINT
PORTFOLIO_PLAN_ID | BIGINT
STEW_LINK_ID | BIGINT
PRG_PLAN_ID | VARCHAR(STRING)
REVISION_NO | BIGINT
PROPERTY_ID | BIGINT
CHANNEL_DIVISION_ID | BIGINT
CHANNEL_SUB_DIVISION_ID | BIGINT
START_DATE | VARCHAR(STRING)
END_DATE | VARCHAR(STRING)
QUARTER_START | VARCHAR(STRING)
QUARTER_END | VARCHAR(STRING)
PLAN_CLASS_CODE | BIGINT
PLAN_CLASS | VARCHAR(STRING)
MARKETPLACE_CODE | BIGINT
ADVERTISER_ID | BIGINT
ksql> CREATE table ADVERTISERTABLE
(table1 varchar,
op_type varchar,
op_ts varchar,
current_ts varchar,
pos varchar,
advertiser_id BIGINT ,
advertiser_name varchar,
advertiser_creation_dt varchar,
advertiser_modification_dt varchar,
create_date varchar,
accounting_ident varchar)
WITH (KAFKA_TOPIC='NBC_APPS.TBL_MS_ADVERTISER_COMPACT',
VALUE_FORMAT='JSON',
KEY = 'advertiser_id');
ksql> describe ADVERTISERTABLE;
Name : ADVERTISERTABLE
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TABLE1 | VARCHAR(STRING)
OP_TYPE | VARCHAR(STRING)
OP_TS | VARCHAR(STRING)
CURRENT_TS | VARCHAR(STRING)
POS | VARCHAR(STRING)
ADVERTISER_ID | INTEGER
ADVERTISER_NAME | VARCHAR(STRING)
ADVERTISER_CREATION_DT | VARCHAR(STRING)
ADVERTISER_MODIFICATION_DT | VARCHAR(STRING)
CREATE_DATE | VARCHAR(STRING)
ACCOUNTING_IDENT | VARCHAR(STRING)
ksql> select * from ADVERTISERTABLE where ADVERTISER_ID = 15019;
1533693600068 |
15019 | NBC_APPS.TBL_MS_ADVERTISER | U | 2018-07-18 18:51:34.000000 | 2018-07-18 14:51:39.397000 | 00000003470236836100 | 15019 | Hanesbrands, INC | 2010-11-18:00:00:00 | 2018-01-29:00:00:00 | 2018-07-02:10:52:15 | 0017000000NoYWVAA3
ksql> select * from PLANSTREAM where ADVERTISER_ID = 15019;
1533693225559 |
382591 | NBC_APPS.TBL_MS_PLAN_WK | U | 2018-07-18 18:51:34.000000 | 2018-07-18 14:51:39.397000 | 00000003470236836100 | 382591 | UNVSO Hanes 17/18 UF NBC Universo (382446)-V2 | 382446 | null | null | null | -1 | 10247 | 128 | -1 | 25-JUN-18 | 26-JUN-18 | 02-APR-18 | 01-JUL-18 | 1170001 | National | 1599002 | 15019
ksql> select a.ADVERTISER_ID,b.ADVERTISER_ID,plan_id from PLANSTREAM a left join ADVERTISERTABLE b on (a.ADVERTISER_ID = b.ADVERTISER_ID);
15019 | null | 382591
advertiser_id = 15019 is having the common records but still not getting joined and populating with Null value .
Based on the below link https://stackoverflow.com/questions/50102662/confluent-4-1-0-ksql-stream-table-join-table-data-null/50103660#50103660 also tried . but not working .
When we tried to join Avro format stream and JSON format table is not working .