Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
KSQL code from my NDC Sydney talk
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM twitter_raw ( \
CreatedAt bigint,Id bigint, Text VARCHAR, SOURCE VARCHAR, Truncated VARCHAR, InReplyToStatusId VARCHAR, InReplyToUserId VARCHAR, InReplyToScreenName VARCHAR, GeoLocation VARCHAR, Place VARCHAR, Favorited VARCHAR, Retweeted VARCHAR, FavoriteCount VARCHAR, User VARCHAR, Retweet VARCHAR, Contributors VARCHAR, RetweetCount VARCHAR, RetweetedByMe VARCHAR, CurrentUserRetweetId VARCHAR, PossiblySensitive VARCHAR, Lang VARCHAR, WithheldInCountries VARCHAR, HashtagEntities VARCHAR, UserMentionEntities VARCHAR, MediaEntities VARCHAR, SymbolEntities VARCHAR, URLEntities VARCHAR) \
WITH (KAFKA_TOPIC='twitter_json_01', partitions=12, VALUE_FORMAT='JSON');
CREATE STREAM twitter AS \
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.Name') AS user_Name, \
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName, \
EXTRACTJSONFIELD(user,'$.Location') AS user_Location, \
EXTRACTJSONFIELD(user,'$.Description') AS user_Description, \
Text,hashtagentities,lang \
FROM twitter_raw WHERE LCASE(hashtagentities) like '%NDCSydney%';
SELECT USER_NAME, TEXT FROM TWITTER WHERE LCASE(TEXT) LIKE '%gamussa%';
CREATE TABLE user_tweet_count AS \
SELECT user_screenname, count(*) AS tweet_count \
FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) WHERE LCASE(TEXT) LIKE '%gamussa%' \
GROUP BY user_screenname ;
CREATE TABLE USER_TWEET_COUNT_DISPLAY AS \
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START,\
USER_SCREENNAME, \
TWEET_COUNT FROM user_tweet_count;
create table top_3_conf as \
SELECT WINDOW_START, USER_SCREENNAME, TWEET_COUNT \
FROM USER_TWEET_COUNT_DISPLAY \
WHERE TWEET_COUNT> 3;
select USER_SCREENNAME, TWEET_COUNT from top_3_conf;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment