Skip to content

Instantly share code, notes, and snippets.

@vlcanunal
Created February 4, 2021 10:41
Show Gist options
  • Save vlcanunal/5a17aba2d010e87ccf652805a8aa1f62 to your computer and use it in GitHub Desktop.
Save vlcanunal/5a17aba2d010e87ccf652805a8aa1f62 to your computer and use it in GitHub Desktop.
Stream processing with KSQL
CREATE STREAM city_analytics_realtime_full \
WITH (KAFKA_TOPIC='analytics.city_realtime_full',PARTITIONS=1,VALUE_FORMAT='JSON') AS \
select orderDate, \
invoiceAddress->cityCode cityId, \
ARRAY_LENGTH(lines) as cartCount , \
orderStatus->name as status, \
lines[1]->product->name productName, \
lines[1]->product->productContentId contentId, \
lines[1]->price productPrice, \
lines[1]->imageUrls[1] productImage, \
orderPrice->totalProduct totalProduct \
from analytics_created_order \
where orderStatus->name = 'Created' EMIT CHANGES;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment