Skip to content

Instantly share code, notes, and snippets.

@sasaken555
Created December 30, 2019 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sasaken555/66783466ac8e7699af4fa980b42c1bc5 to your computer and use it in GitHub Desktop.
Save sasaken555/66783466ac8e7699af4fa980b42c1bc5 to your computer and use it in GitHub Desktop.
Event Stream (manged Kafka service) Pub/Sub
import com.google.gson.Gson;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class Handler {
static final String KAFKA_SERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringSerializer";
static final String KAFKA_DESERIALIZER_CLASS =
"org.apache.kafka.common.serialization.StringDeserializer";
static final String TOPIC = "sometopic";
public static void main(String[] args) {
// Setup producer
Producer<String, String> producer = getProducer("event-stream-producer");
System.out.println("Producer connected to Kafka servers!");
// Setup consumer
Consumer<String, String> consumer = getConsumer("event-stream-consumer", "G3");
consumer.subscribe(Arrays.asList(TOPIC));
System.out.println("Consumer connected to Kafka servers!");
try {
// Send message
Future<RecordMetadata> future =
producer.send(new ProducerRecord<String, String>(TOPIC, "keyZ", "Awesome Kafka"));
// Check metadata
RecordMetadata rm = future.get();
final String topic = rm.topic();
final int partition = rm.partition();
System.out.println("Sent message to topic=" + topic + ", partition=" + partition);
} catch (InterruptedException | ExecutionException e) {
// Describe error
e.printStackTrace();
System.out.println("Failed to send message");
// Cleanup
producer.close();
consumer.close();
System.exit(1);
}
// consumer loop (polls 10 times)
for (int counter = 0; counter < 10; counter++) {
System.out.printf("loop count=%d \n", counter + 1);
// consumer.poll(Long l) is duplicated! use java.time.Duration as args instead.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1500));
records.forEach(
record -> {
System.out.printf(
"offset=%d, partition=%s, value=%s \n",
record.offset(), record.partition(), record.value());
});
}
// Cleanup
producer.close();
consumer.close();
System.exit(0);
}
/**
* Kafka Producerを取得する。
* @param clientId クライアントID
* @return Producer
*/
static Producer<String, String> getProducer(String clientId) {
final String rawServer = System.getenv("kafka_brokers_sasl");
final String servers = convertArrayToCsv(rawServer);
final String apiKey = System.getenv("apikey");
Properties configs = getProducerConfigs(servers, apiKey, clientId);
return new KafkaProducer<>(configs);
}
/**
* Kafka Consumerを取得する。
* @param clientId クライアントID
* @param groupId グループID
* @return Consumer
*/
static Consumer<String, String> getConsumer(String clientId, String groupId) {
final String rawServer = System.getenv("kafka_brokers_sasl");
final String servers = convertArrayToCsv(rawServer);
final String apiKey = System.getenv("apikey");
Properties configs = getConsumerConfigs(servers, apiKey, clientId, groupId);
return new KafkaConsumer<>(configs);
}
/**
* array string -> CSV string
* @return 接続文字列
*/
static String convertArrayToCsv(String arrayString) {
// 環境変数から取り出し
Gson gson = new Gson();
final String[] servers = gson.fromJson(arrayString, String[].class);
// 連結
StringBuilder sb = new StringBuilder();
for (int si = 0; si < servers.length; si++) {
sb.append(servers[si]);
if (si != servers.length - 1) {
sb.append(",");
}
}
return sb.toString();
}
/**
* KafkaのProducer/Consumer共通の設定値を取得する。
* @param servers ブートストラップサーバーのカンマ区切り配列
* @param apiKey APIキー
* @return 設定値
*/
static Properties getCommonConfigs(String servers, String apiKey) {
Properties configs = new Properties();
configs.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, servers);
configs.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
configs.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
configs.setProperty(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\""
+ apiKey
+ "\";");
configs.setProperty(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
configs.setProperty(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2");
configs.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
return configs;
}
/**
* KafkaのProducerの設定値を取得する。
*
* @param servers ブートストラップサーバーのカンマ区切り配列
* @param apiKey APIキー
* @param clientId 一意なクライアントID
* @return 設定値
*/
static Properties getProducerConfigs(String servers, String apiKey, String clientId) {
Properties configs = new Properties();
configs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KAFKA_SERIALIZER_CLASS);
configs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KAFKA_SERIALIZER_CLASS);
configs.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId);
configs.setProperty(ProducerConfig.ACKS_CONFIG, "1"); // Ack if wrote to leader partition
configs.setProperty(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "use_all_dns_ips");
configs.putAll(getCommonConfigs(servers, apiKey));
return configs;
}
/**
* KafkaのProducerの設定値を取得する。
*
* @param servers ブートストラップサーバーのカンマ区切り配列
* @param apiKey APIキー
* @param clientId 一意なクライアントID
* @param groupId ConsumerGroupのID
* @return 設定値
*/
static Properties getConsumerConfigs(
String servers, String apiKey, String clientId, String groupId) {
Properties configs = new Properties();
configs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KAFKA_DESERIALIZER_CLASS);
configs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KAFKA_DESERIALIZER_CLASS);
configs.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
configs.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.putAll(getCommonConfigs(servers, apiKey));
return configs;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment