Skip to content

Instantly share code, notes, and snippets.

@aveuiller
Last active June 27, 2021 14:01
Show Gist options
  • Save aveuiller/360a8083aec489116122e1a09054bd93 to your computer and use it in GitHub Desktop.
Save aveuiller/360a8083aec489116122e1a09054bd93 to your computer and use it in GitHub Desktop.
medium_kafka_apprentice_cookbook
public class Main {
public static void main(String[] args) throws Exception {
// Configure your producer
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", "localhost:29092");
producerProperties.put("acks", "all");
producerProperties.put("retries", 0);
producerProperties.put("linger.ms", 1);
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
producerProperties.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProperties.put("schema.registry.url", "http://localhost:8081");
// Initialize a producer
Producer<Long, AvroHelloMessage> producer = new KafkaProducer<>(producerProperties);
// Use it whenever you need
producer.send(new AvroHelloMessage(1L, "this is a message", 2.4f, 1));
}
}
public class Main {
public static Properties configureConsumer() { 
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", "localhost:29092");
consumerProperties.put("group.id", "HelloConsumer");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
consumerProperties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.put("schema.registry.url", "http://localhost:8081");
// Configure Avro deserializer to convert the received data to a SpecificRecord (i.e. AvroHelloMessage)
// instead of a GenericRecord (i.e. schema + array of deserialized data).
consumerProperties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return consumerProperties;
}
public static void main(String[] args) throws Exception {
// Initialize a consumer
final Consumer<Long, AvroHelloMessage> consumer = new KafkaConsumer<>(configureConsumer());
// Chose the topics you will be polling from.
// You can subscribe to all topics matching a Regex.
consumer.subscribe(Pattern.compile("hello_topic_avro"));
// Poll will return all messages from the current consumer offset
final AtomicBoolean shouldStop = new AtomicBoolean(false);
Thread consumerThread = new Thread(() -> {
final Duration timeout = Duration.ofSeconds(5);
while (!shouldStop) {
for (ConsumerRecord<Long, AvroHelloMessage> record : consumer.poll(timeout)) {
// Use your record
AvroHelloMessage value = record.value();
}
// Be kind to the broker while polling
Thread.sleep(5);
}
consumer.close(timeout);
});
// Start consuming && do other things
consumerThread.start();
// [...]
// End consumption from customer
shouldStop.set(true);
consumerThread.join();
}
}
public class Main {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
// All the process is in the builder configuration
builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
// The consumer loop is handled by the library
streams.start();
latch.await();
}
}
{
"name": "mongo-sink",
"config": {
"topics": "mongo-source",
"tasks.max": "1",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://${file:/auth.properties:username}:${file:/auth.properties:password}@mongo:27017",
"database": "kafka_connect",
"collection": "sink",
"max.num.retries": "1",
"retries.defer.timeout": "5000",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
"delete.on.null.values": "false",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy"
}
}
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors -d @sink-conf.json
ksql> list topics;
Kafka Topic | Partitions | Partition Replicas
----------------------------------------------------
hello_topic_json | 1 | 1
----------------------------------------------------
ksql> print 'hello_topic_json' from beginning;
Key format: KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/25 08:44:20.922 Z, key: 1, value: {"user_id":1,"message":"this is a message","value":2.4,"version":1}
rowtime: 2021/05/25 08:44:20.967 Z, key: 1, value: {"user_id":1,"message":"this is another message","value":2.4,"version":2}
rowtime: 2021/05/25 08:44:20.970 Z, key: 2, value: {"user_id":2,"message":"this is another message","value":2.6,"version":1}
-- Let's create a table from the previous topic
ksql> CREATE TABLE messages (user_id BIGINT PRIMARY KEY, message VARCHAR)
> WITH (KAFKA_TOPIC = 'hello_topic_json', VALUE_FORMAT='JSON');
-- We can see the list and details of each table
ksql> list tables;
Table Name | Kafka Topic | Key Format | Value Format | Windowed
----------------------------------------------------------------------
MESSAGES | hello_topic_json | KAFKA | JSON | false
----------------------------------------------------------------------
ksql> describe messages;
Name : MESSAGES
Field | Type
------------------------------------------
USER_ID | BIGINT (primary key)
MESSAGE | VARCHAR(STRING)
------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
-- Appart from some additions to the language, the queries are almost declared in standard SQL.
ksql> select * from messages EMIT CHANGES;
+--------+------------------------+
|USER_ID |MESSAGE |
+--------+------------------------+
|1 |this is another message |
|2 |this is another message |
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment