Last active
February 23, 2021 13:51
-
-
Save maikelsperandio/f795448684da129178bc7543bb65cd5f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#curl -L https://cnfl.io/cli | sh -s -- -b . | |
Pré requisito é já ter o Kafka e o elasticsearch configurados e em execução. | |
Baixar o confluent community plataform. | |
https://www.confluent.io/download/ | |
Configurar o arquivo etc/ksqldb/ksql-server.properties apontando a url do kafka. | |
Subir o serviço do ksqldb a partir do diretório da confluent: | |
./bin/ksql-server-start etc/ksqldb/ksql-server.properties & | |
Conectar no ksqldb: | |
./bin/ksql | |
Criação do stream para estruturar os dados no ksql: | |
CREATE STREAM pocksql | |
(idClassificacao INT, | |
classificacao VARCHAR, | |
codigoEquipamento VARCHAR, | |
dataHoraPassagemFormatada VARCHAR, | |
faixa INT, | |
placa VARCHAR, | |
velocidadeAferida INT, | |
mesAno VARCHAR, | |
dia VARCHAR) | |
WITH ( VALUE_FORMAT = 'JSON', | |
KAFKA_TOPIC = 'poc-ksql', | |
timestamp = 'dataHoraPassagemFormatada', | |
timestamp_format = 'dd/MM/yyyy HH:mm:ss.SSS'); | |
Criação da tabela a partir do stream configurado no passo anterior: | |
create table classificacao_by_dia as | |
select | |
count(*) as total, | |
classificacao, | |
dia, | |
mesano | |
from pocksql | |
group by classificacao, dia, mesano emit changes; | |
Para monitorar a tabela basta executar o comando abaixo: | |
select * from classificacao_by_dia emit changes; | |
Baixar o kafka e configurar o arquivo config/connect-distributed.properties: | |
bootstrap.server=10.0.1.182:9092 | |
plugin.path=/home/maikel/tools/connectors | |
Subir o serviço do connect: | |
./bin/connect-distributed.sh config/connect-distributed.properties & | |
Baixar o kafka-connect-elasticsearch do hub da confluent | |
https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch | |
Descompactar o arquivo dentro do diretório configurado anteriormente no arquivo connect-distributed.properties. | |
No caso, o caminho é esse: /home/maikel/tools/connectors | |
Criação do sink a partir do ksqldb: | |
CREATE SINK CONNECTOR CLASSIFICACAO_BY_DIA_2_SINK WITH ( | |
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', | |
'connection.url' = 'http://10.0.1.182:9200', | |
'type.name' = '_doc', | |
'topics' = 'CLASSIFICACAO_BY_DIA_2', | |
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', | |
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter', | |
'schema.ignore' = 'true', | |
'value.converter.schemas.enable' = 'false', | |
'schemas.enable' = 'false' | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment