Created
December 20, 2018 20:25
-
-
Save edgars/29f366d14d404044c6edf78efb90d87c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@App:name('ETLStreamAppEnglish') | |
@App:description('ETL from CSV ') | |
@source(type = 'file', mode = 'line', tailing = 'false', dir.uri = 'file:/Users/edgar/Documents/DEV/2018/SP/ETL/IN', action.after.process = 'delete', | |
@map(type = 'csv', header = 'true', | |
@attributes( | |
district = '2', | |
address = '1', | |
latitude = '7', | |
crime = '5', | |
dateAndTime = '0', | |
crimeID = '6', | |
longitude = '8'))) | |
define stream CrimesInitialLoadStream ( | |
dateAndTime string, | |
address string, | |
district string, | |
crime string, | |
crimeID string, | |
latitude string, | |
longitude string); | |
@sink(type = 'log', priority="info") | |
define stream totalLinesStream (totalCount long); | |
@sink(type='kafka', | |
topic='kafka_result_topic_crimes_5400', | |
bootstrap.servers='localhost:9092', | |
partition.no='0', | |
@map(type='json')) | |
define stream crimeID5400Stream ( | |
dateAndTime string, | |
address string, | |
crime string, | |
latitude string, | |
longitude string, | |
totalCount long); | |
-- Count the incoming events | |
@info(name = 'QueryTotalDeLinhas') | |
from CrimesInitialLoadStream | |
select count() as totalCount | |
insert into totalLinesStream; | |
-- Distrito crimeID ==5400 | |
@info(name = 'QuerycrimeID5400') | |
from CrimesInitialLoadStream[(crimeID == '5400')] | |
select dateAndTime, address, crime, latitude, longitude,count() as totalCount | |
insert into crimeID5400Stream; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment