Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
@App:name('ETLStreamApp')
@App:description('ETL from CSV ')
@source(type = 'file', mode = 'line', tailing = 'false', dir.uri = 'file:/Users/edgar/Desktop/trash/poc-orizon/arquivos/IN', action.after.process = 'delete',
@map(type = 'csv', header = 'true',
@attributes(distrito = '2', endereco = '1', latitude = '7', crime = '5', dataHora = '0', codigocrime = '6', longitude = '8')))
define stream CargaInicialCrimesStream (dataHora string, endereco string,
distrito string, crime string, codigocrime string, latitude string, longitude string);
@sink(type = 'log', priority="info")
define stream DitritoCrime5400 (dataHora string, endereco string, crime string,
latitude string, longitude string, totalCount long);
@sink(type = 'log', priority="info")
define stream TotalDeLinhasStream (totalCount long);
@store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/streamdb", username="root",
password="mysql",jdbc.driver.name="com.mysql.jdbc.Driver", pool.properties="maximumPoolSize:30")
@Index('dataHora')
define table CrimesMonitorados (dataHora string, endereco string, crime string,
latitude string, longitude string, totalCount long);
-- Count the incoming events
@info(name = 'QueryTotalDeLinhas')
from CargaInicialCrimesStream
select count() as totalCount
insert into TotalDeLinhasStream;
-- Distrito 54000
@info(name = 'QueryCodigoCrime5400')
from CargaInicialCrimesStream[(codigocrime == '5400')]
select dataHora, endereco, crime, latitude, longitude,count() as totalCount
insert into CrimesMonitorados;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment