Skip to content

Instantly share code, notes, and snippets.

@defndaines
Created February 8, 2017 16:31
Show Gist options
  • Save defndaines/c269bd7ffab85d6971ab56d7b8aea873 to your computer and use it in GitHub Desktop.
Save defndaines/c269bd7ffab85d6971ab56d7b8aea873 to your computer and use it in GitHub Desktop.
Kafka façade for use in a Spark job (adapted liberally from elsewhere)
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicReference
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import scala.collection.JavaConversions._
/** This construct allows for a KafkaPublisher to be reused in an executor.
*
* Establishing communication to the Kafka cluster is expensive, so this should
* improve performance and reduce the risk of connection errors.
*/
object KafkaSink {
def apply(config: Map[String, Object], topic: String): KafkaSink = {
val create = () => {
val producer = new KafkaProducer[String, Array[Byte]](config)
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(create, topic)
}
}
class KafkaSink(createProducer: () => KafkaProducer[String, Array[Byte]], topic: String) extends Serializable {
// Lazy producer allows it to be created by executor and correctly serialize.
lazy val producer = createProducer()
def send(key: String, bytes: Array[Byte], callback: Callback): Future[RecordMetadata] =
producer.send(new ProducerRecord(topic, key, bytes), callback)
}
class KafkaSinkExceptionHandler extends Callback {
private val lastException = new AtomicReference[Option[Exception]](None)
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit =
lastException.set(Option(exception))
def throwExceptionIfAny(): Unit =
lastException.getAndSet(None).foreach(ex => throw ex)
}
@defndaines
Copy link
Author

And an example usage:

    records.foreachRDD { rdd =>
      rdd.foreachPartition { records =>
        val logger = Logger.getLogger(getClass.getName)
        val callback = new KafkaSinkExceptionHandler

        val metadata = records.foreach {rec => {
          callback.throwExceptionIfAny()
          kafkaSink.value.send(filter.filterId.toString, rec, callback)
        })

        // Wait on ack from Kafka publisher before continuing, or records may be lost.
        if (metadata.nonEmpty) {
          logger.info("Waiting on metadata to ack publish to Kafka.")
          metadata.foreach { metadata => metadata.get() }
          callback.throwExceptionIfAny()
        }
      }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment