Created
March 15, 2022 23:02
-
-
Save rareddy/97c02fb472594072a65f98207157887b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.bf2.performance; | |
import org.apache.kafka.clients.admin.Admin; | |
import org.apache.kafka.clients.admin.NewTopic; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import java.io.IOException; | |
import java.nio.file.Path; | |
import java.nio.file.Paths; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Properties; | |
/* | |
* run to allow the permissions | |
curl -vs -H"Authorization: Bearer $(./get_access_token.sh --owner)" http://admin-server-$(./managed_kafka.sh --list | jq -r .items[0].bootstrap_server_host | awk -F: '{print $1}')/rest/acls -XPOST -H'Content-type: application/json' --data '{"resourceType":"GROUP", "resourceName":"*", "patternType":"LITERAL", "principal":"User:srvc-acct-d29f3539-d2bd-41a1-95eb-cb230d808a64", "operation":"ALL", "permission":"ALLOW"}' | |
curl -vs -H"Authorization: Bearer $(./get_access_token.sh --owner)" http://admin-server-$(./managed_kafka.sh --list | jq -r .items[0].bootstrap_server_host | awk -F: '{print $1}')/rest/acls -XPOST -H'Content-type: application/json' --data '{"resourceType":"TOPIC", "resourceName":"*", "patternType":"LITERAL", "principal":"User:srvc-acct-d29f3539-d2bd-41a1-95eb-cb230d808a64", "operation":"ALL", "permission":"ALLOW"}' | |
oc get secret -o yaml ${KAFKA_USERNAME}-cluster-ca-cert -n ${KAFKA_INSTANCE_NAMESPACE} -o json | jq -r '.data."ca.crt"' | base64 --decode > ca.crt | |
eytool -import -trustcacerts -keystore truststore.jks -storepass password -noprompt -alias mk -file ca.crt | |
*/ | |
public class SimpleProducer { | |
public static void main(String[] args) throws Exception { | |
Properties props = getConnectionProperties(); | |
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); | |
String topicName = "foo"; | |
createTopic(props, topicName, 2, 1); | |
Producer<String, String> producer = new KafkaProducer<String, String>(props); | |
int i = 11; | |
while(true) { | |
producer.send(new ProducerRecord<String, String>(topicName,Integer.toString(i), Integer.toString(i))); | |
i++; | |
System.out.print("."); | |
if (System.in.available() > 0) { | |
break; | |
} | |
Thread.sleep(10000); | |
} | |
System.out.println("Message sent successfully"); | |
producer.close(); | |
} | |
public static Properties getConnectionProperties() throws IOException, Exception { | |
String namespace = "kafka-c8ocdclj0sisqtttfl7g"; | |
String name = "trial1"; | |
String url = "c-ocdclj-sisqtttfl-g"; | |
String bootstrap = String.format("%s-kafka-bootstrap-%s.%s:443", name, url, ManagedKafkaDeployer.DOMAIN); | |
// KubeClient client = ManagedKafkaDeployer.connectToKube("kafka-config"); | |
// Secret secret = client.client().secrets().inNamespace(namespace).withName(name+"-cluster-ca-cert").get(); | |
// | |
// if (!certFile.toFile().exists()) { | |
// // ManagedKafkaDeployer.writeTrustStore(certFile, new String(secret.getData().get("ca.p12").getBytes())); | |
// } | |
// String password = new String(Base64.decode(secret.getData().get("ca.password").getBytes())); | |
// create instance for properties to access producer configs | |
Path certFile = Paths.get("truststore.jks"); | |
Properties props = new Properties(); | |
props.put("security.protocol", "SASL_SSL"); | |
props.put("sasl.mechanism", "PLAIN"); | |
props.put("ssl.truststore.type", "JKS"); | |
props.put("ssl.truststore.location", certFile.toAbsolutePath().toString()); | |
props.put("ssl.truststore.password", "password"); | |
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"srvc-acct-d29f3539-d2bd-41a1-95eb-cb230d808a64\" password=\"7fe5cb95-8b86-452b-9428-564ce8fc3651\";"); | |
props.put("bootstrap.servers", bootstrap); | |
return props; | |
} | |
public static void createTopic(Properties properties, String topicName, int partitions, int replicationFactor) throws Exception { | |
try (Admin admin = Admin.create(properties)) { | |
NewTopic newTopic = new NewTopic(topicName, partitions, (short)replicationFactor); | |
if (admin.listTopics().names().get().stream().filter(s -> s.equals(topicName)).count() == 0) { | |
List<NewTopic> newTopics = new ArrayList<NewTopic>(); | |
newTopics.add(newTopic); | |
admin.createTopics(newTopics); | |
admin.close(); | |
} else { | |
System.out.println("Found existing topic:"+topicName); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment