This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: "2" | |
services: | |
live-kafka1: | |
build: live-kafka | |
hostname: live-kafka1 | |
restart: always | |
live-kafka2: | |
build: live-kafka |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var parseCityData = function (cityList, ts) { | |
var totalCityList = { time: "", cityData: [] }; | |
(cityList || []).forEach((element) => { | |
hasCityExisted = totalCityList.cityData.some(function (item) { | |
return item.cityId === element.CITYID; | |
}); | |
// If it doesnt exist in our list, we are adding the data to our list. | |
if (!hasCityExisted) { | |
totalCityList.cityData.push({ | |
cityId: element.CITYID, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create table city_analytics_order_realtime_black_friday | |
with (KAFKA_TOPIC='city_analytics.order_realtime_black_friday',PARTITIONS=1,VALUE_FORMAT='JSON') as | |
select timestamptostring(orderDate, 'yyyy-MM') as orderDate, | |
cityId , | |
SUM(cartCount) as totalCount, | |
SUM(productPrice) as totalPrice | |
from city_analytics_order_realtime_full WINDOW SESSION (2 HOURS) | |
where timestamptostring(orderDate, 'yyyy-MM-dd') >= '2020–11–07' and | |
timestamptostring(orderDate, 'yyyy-MM-dd') <= '2020–11–30' | |
group by timestamptostring(orderDate, 'yyyy-MM'), cityId |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create table city_analytics_order_realtime_black_friday | |
with (KAFKA_TOPIC='city_analytics.order_realtime_black_friday',PARTITIONS=1,VALUE_FORMAT='JSON') as | |
select timestamptostring(orderDate, 'yyyy-MM') as orderDate, | |
cityId , | |
SUM(cartCount) as totalCount, | |
SUM(productPrice) as totalPrice | |
from city_analytics_order_realtime_full WINDOW HOPPING (SIZE 2 DAYS, ADVANCE BY 5 DAYS) | |
where timestamptostring(orderDate, 'yyyy-MM-dd') >= '2020–11–07' and | |
timestamptostring(orderDate, 'yyyy-MM-dd') <= '2020–11–30' | |
group by timestamptostring(orderDate, 'yyyy-MM'), cityId |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create table city_analytics_order_realtime_black_friday | |
with (KAFKA_TOPIC='city_analytics.order_realtime_black_friday',PARTITIONS=1,VALUE_FORMAT='JSON') as | |
select timestamptostring(orderDate, 'yyyy-MM') as orderDate, | |
cityId , | |
SUM(cartCount) as totalCount, | |
SUM(productPrice) as totalPrice | |
from city_analytics_order_realtime_full WINDOW TUMBLING (SIZE 1 SECONDS) | |
where timestamptostring(orderDate, 'yyyy-MM-dd') >= '2020–11–27' and | |
timestamptostring(orderDate, 'yyyy-MM-dd') <= '2020–11–30' | |
group by timestamptostring(orderDate, 'yyyy-MM'), cityId |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create table city_analytics_order_realtime_black_friday | |
with (KAFKA_TOPIC='city_analytics.order_realtime_black_friday',PARTITIONS=1,VALUE_FORMAT='JSON') as | |
select timestamptostring(orderDate, 'yyyy-MM') as orderDate, | |
cityId , | |
SUM(cartCount) as totalCount, | |
SUM(productPrice) as totalPrice | |
from city_analytics_order_realtime_full where timestamptostring(orderDate, 'yyyy-MM-dd') >= '2020–11–27' and timestamptostring(orderDate, 'yyyy-MM-dd') <= '2020–11–30' | |
group by timestamptostring(orderDate, 'yyyy-MM'), cityId | |
EMIT CHANGES; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '2' | |
services: | |
ksqldb-server: | |
image: {$KSQL_IMAGE} | |
hostname: ksqldb-server | |
container_name: ksqldb-server | |
ports: | |
- "8088:8088" | |
environment: | |
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: "earliest" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE STREAM city_analytics_realtime_full \ | |
WITH (KAFKA_TOPIC='analytics.city_realtime_full',PARTITIONS=1,VALUE_FORMAT='JSON') AS \ | |
select orderDate, \ | |
invoiceAddress->cityCode cityId, \ | |
ARRAY_LENGTH(lines) as cartCount , \ | |
orderStatus->name as status, \ | |
lines[1]->product->name productName, \ | |
lines[1]->product->productContentId contentId, \ | |
lines[1]->price productPrice, \ | |
lines[1]->imageUrls[1] productImage, \ |