Skip to content

Instantly share code, notes, and snippets.

@vlcanunal
vlcanunal / docker-compose.yml
Created February 8, 2021 13:00
docker-compose.yml
version: "2"
services:
live-kafka1:
build: live-kafka
hostname: live-kafka1
restart: always
live-kafka2:
build: live-kafka
@vlcanunal
vlcanunal / dwh.js
Last active September 17, 2022 13:41
var parseCityData = function (cityList, ts) {
var totalCityList = { time: "", cityData: [] };
(cityList || []).forEach((element) => {
hasCityExisted = totalCityList.cityData.some(function (item) {
return item.cityId === element.CITYID;
});
// If it doesnt exist in our list, we are adding the data to our list.
if (!hasCityExisted) {
totalCityList.cityData.push({
cityId: element.CITYID,
@vlcanunal
vlcanunal / window_session.sql
Created February 4, 2021 12:04
Usage of window session
create table city_analytics_order_realtime_black_friday
with (KAFKA_TOPIC='city_analytics.order_realtime_black_friday',PARTITIONS=1,VALUE_FORMAT='JSON') as
select timestamptostring(orderDate, 'yyyy-MM') as orderDate,
cityId ,
SUM(cartCount) as totalCount,
SUM(productPrice) as totalPrice
from city_analytics_order_realtime_full WINDOW SESSION (2 HOURS)
where timestamptostring(orderDate, 'yyyy-MM-dd') >= '2020–11–07' and
timestamptostring(orderDate, 'yyyy-MM-dd') <= '2020–11–30'
group by timestamptostring(orderDate, 'yyyy-MM'), cityId
@vlcanunal
vlcanunal / hopping_window.sql
Created February 4, 2021 11:40
Usage of hopping window
create table city_analytics_order_realtime_black_friday
with (KAFKA_TOPIC='city_analytics.order_realtime_black_friday',PARTITIONS=1,VALUE_FORMAT='JSON') as
select timestamptostring(orderDate, 'yyyy-MM') as orderDate,
cityId ,
SUM(cartCount) as totalCount,
SUM(productPrice) as totalPrice
from city_analytics_order_realtime_full WINDOW HOPPING (SIZE 2 DAYS, ADVANCE BY 5 DAYS)
where timestamptostring(orderDate, 'yyyy-MM-dd') >= '2020–11–07' and
timestamptostring(orderDate, 'yyyy-MM-dd') <= '2020–11–30'
group by timestamptostring(orderDate, 'yyyy-MM'), cityId
@vlcanunal
vlcanunal / black_friday.sql
Created February 4, 2021 11:39
Usage of EMIT FINAL
create table city_analytics_order_realtime_black_friday
with (KAFKA_TOPIC='city_analytics.order_realtime_black_friday',PARTITIONS=1,VALUE_FORMAT='JSON') as
select timestamptostring(orderDate, 'yyyy-MM') as orderDate,
cityId ,
SUM(cartCount) as totalCount,
SUM(productPrice) as totalPrice
from city_analytics_order_realtime_full WINDOW TUMBLING (SIZE 1 SECONDS)
where timestamptostring(orderDate, 'yyyy-MM-dd') >= '2020–11–27' and
timestamptostring(orderDate, 'yyyy-MM-dd') <= '2020–11–30'
group by timestamptostring(orderDate, 'yyyy-MM'), cityId
create table city_analytics_order_realtime_black_friday
with (KAFKA_TOPIC='city_analytics.order_realtime_black_friday',PARTITIONS=1,VALUE_FORMAT='JSON') as
select timestamptostring(orderDate, 'yyyy-MM') as orderDate,
cityId ,
SUM(cartCount) as totalCount,
SUM(productPrice) as totalPrice
from city_analytics_order_realtime_full where timestamptostring(orderDate, 'yyyy-MM-dd') >= '2020–11–27' and timestamptostring(orderDate, 'yyyy-MM-dd') <= '2020–11–30'
group by timestamptostring(orderDate, 'yyyy-MM'), cityId
EMIT CHANGES;
@vlcanunal
vlcanunal / docker-compose.yml
Last active February 18, 2021 09:25
docker-compose.yml file for KSQL
version: '2'
services:
ksqldb-server:
image: {$KSQL_IMAGE}
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: "earliest"
@vlcanunal
vlcanunal / city_analytics.sql
Created February 4, 2021 10:41
Stream processing with KSQL
CREATE STREAM city_analytics_realtime_full \
WITH (KAFKA_TOPIC='analytics.city_realtime_full',PARTITIONS=1,VALUE_FORMAT='JSON') AS \
select orderDate, \
invoiceAddress->cityCode cityId, \
ARRAY_LENGTH(lines) as cartCount , \
orderStatus->name as status, \
lines[1]->product->name productName, \
lines[1]->product->productContentId contentId, \
lines[1]->price productPrice, \
lines[1]->imageUrls[1] productImage, \