Created
December 30, 2019 17:06
-
-
Save sasaken555/66783466ac8e7699af4fa980b42c1bc5 to your computer and use it in GitHub Desktop.
Event Stream (manged Kafka service) Pub/Sub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.gson.Gson; | |
import org.apache.kafka.clients.CommonClientConfigs; | |
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.producer.*; | |
import org.apache.kafka.common.config.SaslConfigs; | |
import org.apache.kafka.common.config.SslConfigs; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Properties; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Future; | |
public class Handler { | |
static final String KAFKA_SERIALIZER_CLASS = | |
"org.apache.kafka.common.serialization.StringSerializer"; | |
static final String KAFKA_DESERIALIZER_CLASS = | |
"org.apache.kafka.common.serialization.StringDeserializer"; | |
static final String TOPIC = "sometopic"; | |
public static void main(String[] args) { | |
// Setup producer | |
Producer<String, String> producer = getProducer("event-stream-producer"); | |
System.out.println("Producer connected to Kafka servers!"); | |
// Setup consumer | |
Consumer<String, String> consumer = getConsumer("event-stream-consumer", "G3"); | |
consumer.subscribe(Arrays.asList(TOPIC)); | |
System.out.println("Consumer connected to Kafka servers!"); | |
try { | |
// Send message | |
Future<RecordMetadata> future = | |
producer.send(new ProducerRecord<String, String>(TOPIC, "keyZ", "Awesome Kafka")); | |
// Check metadata | |
RecordMetadata rm = future.get(); | |
final String topic = rm.topic(); | |
final int partition = rm.partition(); | |
System.out.println("Sent message to topic=" + topic + ", partition=" + partition); | |
} catch (InterruptedException | ExecutionException e) { | |
// Describe error | |
e.printStackTrace(); | |
System.out.println("Failed to send message"); | |
// Cleanup | |
producer.close(); | |
consumer.close(); | |
System.exit(1); | |
} | |
// consumer loop (polls 10 times) | |
for (int counter = 0; counter < 10; counter++) { | |
System.out.printf("loop count=%d \n", counter + 1); | |
// consumer.poll(Long l) is duplicated! use java.time.Duration as args instead. | |
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1500)); | |
records.forEach( | |
record -> { | |
System.out.printf( | |
"offset=%d, partition=%s, value=%s \n", | |
record.offset(), record.partition(), record.value()); | |
}); | |
} | |
// Cleanup | |
producer.close(); | |
consumer.close(); | |
System.exit(0); | |
} | |
/** | |
* Kafka Producerを取得する。 | |
* @param clientId クライアントID | |
* @return Producer | |
*/ | |
static Producer<String, String> getProducer(String clientId) { | |
final String rawServer = System.getenv("kafka_brokers_sasl"); | |
final String servers = convertArrayToCsv(rawServer); | |
final String apiKey = System.getenv("apikey"); | |
Properties configs = getProducerConfigs(servers, apiKey, clientId); | |
return new KafkaProducer<>(configs); | |
} | |
/** | |
* Kafka Consumerを取得する。 | |
* @param clientId クライアントID | |
* @param groupId グループID | |
* @return Consumer | |
*/ | |
static Consumer<String, String> getConsumer(String clientId, String groupId) { | |
final String rawServer = System.getenv("kafka_brokers_sasl"); | |
final String servers = convertArrayToCsv(rawServer); | |
final String apiKey = System.getenv("apikey"); | |
Properties configs = getConsumerConfigs(servers, apiKey, clientId, groupId); | |
return new KafkaConsumer<>(configs); | |
} | |
/** | |
* array string -> CSV string | |
* @return 接続文字列 | |
*/ | |
static String convertArrayToCsv(String arrayString) { | |
// 環境変数から取り出し | |
Gson gson = new Gson(); | |
final String[] servers = gson.fromJson(arrayString, String[].class); | |
// 連結 | |
StringBuilder sb = new StringBuilder(); | |
for (int si = 0; si < servers.length; si++) { | |
sb.append(servers[si]); | |
if (si != servers.length - 1) { | |
sb.append(","); | |
} | |
} | |
return sb.toString(); | |
} | |
/** | |
* KafkaのProducer/Consumer共通の設定値を取得する。 | |
* @param servers ブートストラップサーバーのカンマ区切り配列 | |
* @param apiKey APIキー | |
* @return 設定値 | |
*/ | |
static Properties getCommonConfigs(String servers, String apiKey) { | |
Properties configs = new Properties(); | |
configs.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, servers); | |
configs.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); | |
configs.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); | |
configs.setProperty( | |
SaslConfigs.SASL_JAAS_CONFIG, | |
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"" | |
+ apiKey | |
+ "\";"); | |
configs.setProperty(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); | |
configs.setProperty(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "TLSv1.2"); | |
configs.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); | |
return configs; | |
} | |
/** | |
* KafkaのProducerの設定値を取得する。 | |
* | |
* @param servers ブートストラップサーバーのカンマ区切り配列 | |
* @param apiKey APIキー | |
* @param clientId 一意なクライアントID | |
* @return 設定値 | |
*/ | |
static Properties getProducerConfigs(String servers, String apiKey, String clientId) { | |
Properties configs = new Properties(); | |
configs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KAFKA_SERIALIZER_CLASS); | |
configs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KAFKA_SERIALIZER_CLASS); | |
configs.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId); | |
configs.setProperty(ProducerConfig.ACKS_CONFIG, "1"); // Ack if wrote to leader partition | |
configs.setProperty(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "use_all_dns_ips"); | |
configs.putAll(getCommonConfigs(servers, apiKey)); | |
return configs; | |
} | |
/** | |
* KafkaのProducerの設定値を取得する。 | |
* | |
* @param servers ブートストラップサーバーのカンマ区切り配列 | |
* @param apiKey APIキー | |
* @param clientId 一意なクライアントID | |
* @param groupId ConsumerGroupのID | |
* @return 設定値 | |
*/ | |
static Properties getConsumerConfigs( | |
String servers, String apiKey, String clientId, String groupId) { | |
Properties configs = new Properties(); | |
configs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KAFKA_DESERIALIZER_CLASS); | |
configs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KAFKA_DESERIALIZER_CLASS); | |
configs.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId); | |
configs.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); | |
configs.putAll(getCommonConfigs(servers, apiKey)); | |
return configs; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment