Skip to content

Instantly share code, notes, and snippets.

@fujohnwang
Created August 22, 2014 10:04
Show Gist options
  • Save fujohnwang/33f8476c447c9fc747e2 to your computer and use it in GitHub Desktop.
Save fujohnwang/33f8476c447c9fc747e2 to your computer and use it in GitHub Desktop.
Simple Purposed Kafka Producer
trait Messenger {
def send(topic: String, message: String): Unit
def send(topic: String, messageKey: String, messageBody: String): Unit
}
class KafkaMessenger(producer: Producer[String, String]) extends Messenger {
override def send(topic: String, message: String): Unit = send(topic, null, message)
override def send(topic: String, messageKey: String, messageBody: String): Unit = {
val message = new KeyedMessage[String, String](topic, messageKey, messageBody)
producer.send(message)
}
}
/**
* If a more fine-tuned Kafka Producer is needed, subclass this one to make yourself at ease.
*/
class KafkaMessengerFactoryBean extends FactoryBean[Messenger] with InitializingBean with DisposableBean {
@BeanProperty
var syncMode: Int = -1
@BeanProperty
var brokerList: String = _
protected var messenger: Messenger = _
protected var producer: Producer[String, String] = _
override def getObject: Messenger = if (messenger != null) messenger else throw new IllegalStateException("messenger is not initialized yet, call afterPropertiesSet() first.")
override def getObjectType: Class[_] = classOf[Messenger]
override def isSingleton: Boolean = true
override def afterPropertiesSet(): Unit = {
if (brokerList == null || brokerList.trim.length == 0) throw new IllegalArgumentException("brokerList property must be given.")
val prop = new Properties
prop.put("metadata.broker.list", brokerList)
prop.put("serializer.class", "kafka.serializer.StringEncoder")
prop.put("request.required.acks", syncMode)
producer = new Producer[String, String](new ProducerConfig(prop))
messenger = new KafkaMessenger(producer)
}
override def destroy(): Unit = {
if (producer != null) {
producer.close
producer = null
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment