Created
February 21, 2020 08:44
-
-
Save diggzhang/6f70fcaf0489dd47177e44de53b9df7f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.onion.dataprocess.helpers | |
import java.util.concurrent.Future | |
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata } | |
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { | |
/* This is the key idea that allows us to work around running into | |
NotSerializableExceptions. */ | |
lazy val producer = createProducer() | |
def send(topic: String, key: K, value: V): Future[RecordMetadata] = | |
producer.send(new ProducerRecord[K, V](topic, key, value)) | |
def send(topic: String, value: V): Future[RecordMetadata] = | |
producer.send(new ProducerRecord[K, V](topic, value)) | |
} | |
object KafkaSink { | |
import scala.collection.JavaConversions._ | |
def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { | |
val createProducerFunc = () => { | |
val producer = new KafkaProducer[K, V](config) | |
sys.addShutdownHook { | |
// Ensure that, on executor JVM shutdown, the Kafka producer sends | |
// any buffered messages to Kafka before shutting down. | |
producer.close() | |
} | |
producer | |
} | |
new KafkaSink(createProducerFunc) | |
} | |
def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment