Skip to content

Instantly share code, notes, and snippets.

@miguno
Last active August 16, 2018 13:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miguno/e6048db8b5a55b334947a848c63ca884 to your computer and use it in GitHub Desktop.
Save miguno/e6048db8b5a55b334947a848c63ca884 to your computer and use it in GitHub Desktop.
Using KSQL to join data sources with different formats, e.g. a stream with JSON and a table with Avro data.
CREATE STREAM pageviews (viewtime BIGINT, user_id VARCHAR, ...) WITH (KAFKA_TOPIC='pageviews-topic', VALUE_FORMAT='AVRO');
CREATE TABLE users (user_id VARCHAR, registertime BIGINT, ...) WITH (KAFKA_TOPIC='users-topic', KEY='user_id', VALUE_FORMAT='JSON');
CREATE STREAM pageviews_enriched AS
SELECT pv.viewtime, pv.userid AS userid, ... FROM pageviews pv
LEFT JOIN users ON users.user_id = pv.user_id;
@karthikeyanrd27
Copy link

ksql> CREATE STREAM PLANSTREAM WITH (KAFKA_TOPIC='NBC_APPS.TBL_MS_PLAN_WK',VALUE_FORMAT='AVRO');

ksql> describe PLANSTREAM;

Name : PLANSTREAM
Field | Type

ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TABLE1 | VARCHAR(STRING)
OP_TYPE | VARCHAR(STRING)
OP_TS | VARCHAR(STRING)
CURRENT_TS | VARCHAR(STRING)
POS | VARCHAR(STRING)
PLAN_ID | BIGINT
PLAN_NAME | VARCHAR(STRING)
ORIGINAL_PLAN_ID | BIGINT
PORTFOLIO_PLAN_ID | BIGINT
STEW_LINK_ID | BIGINT
PRG_PLAN_ID | VARCHAR(STRING)
REVISION_NO | BIGINT
PROPERTY_ID | BIGINT
CHANNEL_DIVISION_ID | BIGINT
CHANNEL_SUB_DIVISION_ID | BIGINT
START_DATE | VARCHAR(STRING)
END_DATE | VARCHAR(STRING)
QUARTER_START | VARCHAR(STRING)
QUARTER_END | VARCHAR(STRING)
PLAN_CLASS_CODE | BIGINT
PLAN_CLASS | VARCHAR(STRING)
MARKETPLACE_CODE | BIGINT
ADVERTISER_ID | BIGINT

ksql> CREATE table ADVERTISERTABLE
(table1 varchar,
op_type varchar,
op_ts varchar,
current_ts varchar,
pos varchar,
advertiser_id BIGINT ,
advertiser_name varchar,
advertiser_creation_dt varchar,
advertiser_modification_dt varchar,
create_date varchar,
accounting_ident varchar)
WITH (KAFKA_TOPIC='NBC_APPS.TBL_MS_ADVERTISER_COMPACT',
VALUE_FORMAT='JSON',
KEY = 'advertiser_id');

ksql> describe ADVERTISERTABLE;

Name : ADVERTISERTABLE
Field | Type

ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TABLE1 | VARCHAR(STRING)
OP_TYPE | VARCHAR(STRING)
OP_TS | VARCHAR(STRING)
CURRENT_TS | VARCHAR(STRING)
POS | VARCHAR(STRING)
ADVERTISER_ID | INTEGER
ADVERTISER_NAME | VARCHAR(STRING)
ADVERTISER_CREATION_DT | VARCHAR(STRING)
ADVERTISER_MODIFICATION_DT | VARCHAR(STRING)
CREATE_DATE | VARCHAR(STRING)
ACCOUNTING_IDENT | VARCHAR(STRING)

ksql> select * from ADVERTISERTABLE where ADVERTISER_ID = 15019;
1533693600068 |
15019 | NBC_APPS.TBL_MS_ADVERTISER | U | 2018-07-18 18:51:34.000000 | 2018-07-18 14:51:39.397000 | 00000003470236836100 | 15019 | Hanesbrands, INC | 2010-11-18:00:00:00 | 2018-01-29:00:00:00 | 2018-07-02:10:52:15 | 0017000000NoYWVAA3

ksql> select * from PLANSTREAM where ADVERTISER_ID = 15019;
1533693225559 |
382591 | NBC_APPS.TBL_MS_PLAN_WK | U | 2018-07-18 18:51:34.000000 | 2018-07-18 14:51:39.397000 | 00000003470236836100 | 382591 | UNVSO Hanes 17/18 UF NBC Universo (382446)-V2 | 382446 | null | null | null | -1 | 10247 | 128 | -1 | 25-JUN-18 | 26-JUN-18 | 02-APR-18 | 01-JUL-18 | 1170001 | National | 1599002 | 15019

ksql> select a.ADVERTISER_ID,b.ADVERTISER_ID,plan_id from PLANSTREAM a left join ADVERTISERTABLE b on (a.ADVERTISER_ID = b.ADVERTISER_ID);
15019 | null | 382591

advertiser_id = 15019 is having the common records but still not getting joined and populating with Null value .
Based on the below link https://stackoverflow.com/questions/50102662/confluent-4-1-0-ksql-stream-table-join-table-data-null/50103660#50103660 also tried . but not working .
When we tried to join Avro format stream and JSON format table is not working .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment