Skip to content

Instantly share code, notes, and snippets.

@ag-ramachandran
Created August 4, 2022 15:50
Show Gist options
  • Save ag-ramachandran/564acf1f2a767ac4e9b2c82be19dba70 to your computer and use it in GitHub Desktop.
Save ag-ramachandran/564acf1f2a767ac4e9b2c82be19dba70 to your computer and use it in GitHub Desktop.
Send a simple protobuf message
package kafka;
import com.github.javafaker.Faker;
import com.google.protobuf.ByteString;
import com.microsoft.kusto.test.protobuf.SensorReadingWithDeviceDataImpl;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;
public class SendKafkaProtoSensorInfo {
public static void main(String[] args) {
String bootstrapServers = "127.0.0.1:9092";
var properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("schema.registry.url", "http://localhost:8081");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", KafkaProtobufSerializer.class.getName());
KafkaProducer<String, SensorReadingWithDeviceDataImpl.SensorReading> producer = new KafkaProducer<>(properties);
// Specify Topic
var topic = "pbstreams.sensor";
for (int i = 100; i < 200; i++) {
Faker faker = new Faker();
Random rd = new Random();
var deviceData = SensorReadingWithDeviceDataImpl.SensorReading.Device.newBuilder()
.setDeviceID("Device-" + i).setEnabled(faker.bool().bool());
List<Double> repeatedDoubles = Arrays.asList(rd.nextDouble(),rd.nextDouble(),rd.nextDouble());
var sensorData = SensorReadingWithDeviceDataImpl.SensorReading.newBuilder()
.setReading(rd.nextDouble()).setDevice(deviceData)
.setDateTime(System.currentTimeMillis()).setFloatVal(rd.nextFloat()).setInt32Val(rd.nextInt())
.setUint32Val(rd.nextInt()).setUint64Val(rd.nextLong()).setSint32Val(rd.nextInt())
.setBytesVal(ByteString.copyFrom(("Bytes=="+i).getBytes(StandardCharsets.UTF_8)))
.addAllDoubleArrayVal(repeatedDoubles).build();
var record = new ProducerRecord<>(topic, "SensorData" + i, sensorData);
producer.send(record);
}
producer.flush();
producer.close();
System.out.println("==>Sent Data Successfully to topic<==" + topic);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment