Skip to content

Instantly share code, notes, and snippets.

@edgars
Created December 20, 2018 20:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edgars/29f366d14d404044c6edf78efb90d87c to your computer and use it in GitHub Desktop.
Save edgars/29f366d14d404044c6edf78efb90d87c to your computer and use it in GitHub Desktop.
@App:name('ETLStreamAppEnglish')
@App:description('ETL from CSV ')
@source(type = 'file', mode = 'line', tailing = 'false', dir.uri = 'file:/Users/edgar/Documents/DEV/2018/SP/ETL/IN', action.after.process = 'delete',
@map(type = 'csv', header = 'true',
@attributes(
district = '2',
address = '1',
latitude = '7',
crime = '5',
dateAndTime = '0',
crimeID = '6',
longitude = '8')))
define stream CrimesInitialLoadStream (
dateAndTime string,
address string,
district string,
crime string,
crimeID string,
latitude string,
longitude string);
@sink(type = 'log', priority="info")
define stream totalLinesStream (totalCount long);
@sink(type='kafka',
topic='kafka_result_topic_crimes_5400',
bootstrap.servers='localhost:9092',
partition.no='0',
@map(type='json'))
define stream crimeID5400Stream (
dateAndTime string,
address string,
crime string,
latitude string,
longitude string,
totalCount long);
-- Count the incoming events
@info(name = 'QueryTotalDeLinhas')
from CrimesInitialLoadStream
select count() as totalCount
insert into totalLinesStream;
-- Distrito crimeID ==5400
@info(name = 'QuerycrimeID5400')
from CrimesInitialLoadStream[(crimeID == '5400')]
select dateAndTime, address, crime, latitude, longitude,count() as totalCount
insert into crimeID5400Stream;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment