Skip to content

Instantly share code, notes, and snippets.

@maikelsperandio
Last active February 23, 2021 13:51
Show Gist options
  • Save maikelsperandio/f795448684da129178bc7543bb65cd5f to your computer and use it in GitHub Desktop.
Save maikelsperandio/f795448684da129178bc7543bb65cd5f to your computer and use it in GitHub Desktop.
#curl -L https://cnfl.io/cli | sh -s -- -b .
Pré requisito é já ter o Kafka e o elasticsearch configurados e em execução.
Baixar o confluent community plataform.
https://www.confluent.io/download/
Configurar o arquivo etc/ksqldb/ksql-server.properties apontando a url do kafka.
Subir o serviço do ksqldb a partir do diretório da confluent:
./bin/ksql-server-start etc/ksqldb/ksql-server.properties &
Conectar no ksqldb:
./bin/ksql
Criação do stream para estruturar os dados no ksql:
CREATE STREAM pocksql
(idClassificacao INT,
classificacao VARCHAR,
codigoEquipamento VARCHAR,
dataHoraPassagemFormatada VARCHAR,
faixa INT,
placa VARCHAR,
velocidadeAferida INT,
mesAno VARCHAR,
dia VARCHAR)
WITH ( VALUE_FORMAT = 'JSON',
KAFKA_TOPIC = 'poc-ksql',
timestamp = 'dataHoraPassagemFormatada',
timestamp_format = 'dd/MM/yyyy HH:mm:ss.SSS');
Criação da tabela a partir do stream configurado no passo anterior:
create table classificacao_by_dia as
select
count(*) as total,
classificacao,
dia,
mesano
from pocksql
group by classificacao, dia, mesano emit changes;
Para monitorar a tabela basta executar o comando abaixo:
select * from classificacao_by_dia emit changes;
Baixar o kafka e configurar o arquivo config/connect-distributed.properties:
bootstrap.server=10.0.1.182:9092
plugin.path=/home/maikel/tools/connectors
Subir o serviço do connect:
./bin/connect-distributed.sh config/connect-distributed.properties &
Baixar o kafka-connect-elasticsearch do hub da confluent
https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
Descompactar o arquivo dentro do diretório configurado anteriormente no arquivo connect-distributed.properties.
No caso, o caminho é esse: /home/maikel/tools/connectors
Criação do sink a partir do ksqldb:
CREATE SINK CONNECTOR CLASSIFICACAO_BY_DIA_2_SINK WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://10.0.1.182:9200',
'type.name' = '_doc',
'topics' = 'CLASSIFICACAO_BY_DIA_2',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'schema.ignore' = 'true',
'value.converter.schemas.enable' = 'false',
'schemas.enable' = 'false'
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment