Skip to content

Instantly share code, notes, and snippets.

@rareddy
Created March 15, 2022 23:02
Show Gist options
  • Save rareddy/97c02fb472594072a65f98207157887b to your computer and use it in GitHub Desktop.
Save rareddy/97c02fb472594072a65f98207157887b to your computer and use it in GitHub Desktop.
package org.bf2.performance;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/*
* run to allow the permissions
curl -vs -H"Authorization: Bearer $(./get_access_token.sh --owner)" http://admin-server-$(./managed_kafka.sh --list | jq -r .items[0].bootstrap_server_host | awk -F: '{print $1}')/rest/acls -XPOST -H'Content-type: application/json' --data '{"resourceType":"GROUP", "resourceName":"*", "patternType":"LITERAL", "principal":"User:srvc-acct-d29f3539-d2bd-41a1-95eb-cb230d808a64", "operation":"ALL", "permission":"ALLOW"}'
curl -vs -H"Authorization: Bearer $(./get_access_token.sh --owner)" http://admin-server-$(./managed_kafka.sh --list | jq -r .items[0].bootstrap_server_host | awk -F: '{print $1}')/rest/acls -XPOST -H'Content-type: application/json' --data '{"resourceType":"TOPIC", "resourceName":"*", "patternType":"LITERAL", "principal":"User:srvc-acct-d29f3539-d2bd-41a1-95eb-cb230d808a64", "operation":"ALL", "permission":"ALLOW"}'
oc get secret -o yaml ${KAFKA_USERNAME}-cluster-ca-cert -n ${KAFKA_INSTANCE_NAMESPACE} -o json | jq -r '.data."ca.crt"' | base64 --decode > ca.crt
eytool -import -trustcacerts -keystore truststore.jks -storepass password -noprompt -alias mk -file ca.crt
*/
public class SimpleProducer {
public static void main(String[] args) throws Exception {
Properties props = getConnectionProperties();
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
String topicName = "foo";
createTopic(props, topicName, 2, 1);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int i = 11;
while(true) {
producer.send(new ProducerRecord<String, String>(topicName,Integer.toString(i), Integer.toString(i)));
i++;
System.out.print(".");
if (System.in.available() > 0) {
break;
}
Thread.sleep(10000);
}
System.out.println("Message sent successfully");
producer.close();
}
public static Properties getConnectionProperties() throws IOException, Exception {
String namespace = "kafka-c8ocdclj0sisqtttfl7g";
String name = "trial1";
String url = "c-ocdclj-sisqtttfl-g";
String bootstrap = String.format("%s-kafka-bootstrap-%s.%s:443", name, url, ManagedKafkaDeployer.DOMAIN);
// KubeClient client = ManagedKafkaDeployer.connectToKube("kafka-config");
// Secret secret = client.client().secrets().inNamespace(namespace).withName(name+"-cluster-ca-cert").get();
//
// if (!certFile.toFile().exists()) {
// // ManagedKafkaDeployer.writeTrustStore(certFile, new String(secret.getData().get("ca.p12").getBytes()));
// }
// String password = new String(Base64.decode(secret.getData().get("ca.password").getBytes()));
// create instance for properties to access producer configs
Path certFile = Paths.get("truststore.jks");
Properties props = new Properties();
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.truststore.type", "JKS");
props.put("ssl.truststore.location", certFile.toAbsolutePath().toString());
props.put("ssl.truststore.password", "password");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"srvc-acct-d29f3539-d2bd-41a1-95eb-cb230d808a64\" password=\"7fe5cb95-8b86-452b-9428-564ce8fc3651\";");
props.put("bootstrap.servers", bootstrap);
return props;
}
public static void createTopic(Properties properties, String topicName, int partitions, int replicationFactor) throws Exception {
try (Admin admin = Admin.create(properties)) {
NewTopic newTopic = new NewTopic(topicName, partitions, (short)replicationFactor);
if (admin.listTopics().names().get().stream().filter(s -> s.equals(topicName)).count() == 0) {
List<NewTopic> newTopics = new ArrayList<NewTopic>();
newTopics.add(newTopic);
admin.createTopics(newTopics);
admin.close();
} else {
System.out.println("Found existing topic:"+topicName);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment