Created
August 22, 2014 10:04
-
-
Save fujohnwang/33f8476c447c9fc747e2 to your computer and use it in GitHub Desktop.
Simple Purposed Kafka Producer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Messenger { | |
def send(topic: String, message: String): Unit | |
def send(topic: String, messageKey: String, messageBody: String): Unit | |
} | |
class KafkaMessenger(producer: Producer[String, String]) extends Messenger { | |
override def send(topic: String, message: String): Unit = send(topic, null, message) | |
override def send(topic: String, messageKey: String, messageBody: String): Unit = { | |
val message = new KeyedMessage[String, String](topic, messageKey, messageBody) | |
producer.send(message) | |
} | |
} | |
/** | |
* If a more fine-tuned Kafka Producer is needed, subclass this one to make yourself at ease. | |
*/ | |
class KafkaMessengerFactoryBean extends FactoryBean[Messenger] with InitializingBean with DisposableBean { | |
@BeanProperty | |
var syncMode: Int = -1 | |
@BeanProperty | |
var brokerList: String = _ | |
protected var messenger: Messenger = _ | |
protected var producer: Producer[String, String] = _ | |
override def getObject: Messenger = if (messenger != null) messenger else throw new IllegalStateException("messenger is not initialized yet, call afterPropertiesSet() first.") | |
override def getObjectType: Class[_] = classOf[Messenger] | |
override def isSingleton: Boolean = true | |
override def afterPropertiesSet(): Unit = { | |
if (brokerList == null || brokerList.trim.length == 0) throw new IllegalArgumentException("brokerList property must be given.") | |
val prop = new Properties | |
prop.put("metadata.broker.list", brokerList) | |
prop.put("serializer.class", "kafka.serializer.StringEncoder") | |
prop.put("request.required.acks", syncMode) | |
producer = new Producer[String, String](new ProducerConfig(prop)) | |
messenger = new KafkaMessenger(producer) | |
} | |
override def destroy(): Unit = { | |
if (producer != null) { | |
producer.close | |
producer = null | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment