Skip to content

Instantly share code, notes, and snippets.

@mstine
Created July 6, 2015 21:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mstine/dd58c6c0e6dc717b291d to your computer and use it in GitHub Desktop.
Save mstine/dd58c6c0e6dc717b291d to your computer and use it in GitHub Desktop.
Hello Kafka SI world...
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd">
<!-- Producer Configuration -->
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
channel="inputToKafka">
<int:poller fixed-delay="1000"
time-unit="MILLISECONDS"
receive-timeout="0"
task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor"
pool-size="5"
keep-alive="120"
queue-capacity="500"/>
<bean id="stringEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"/>
<bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder">
<constructor-arg value="io.spring.cna.samples.escqrsdemo.avro.MyMessage"/>
</bean>
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
value-class-type="io.spring.cna.samples.escqrsdemo.avro.MyMessage"
value-encoder="kafkaEncoder"
topic="event-stream"
broker-list="localhost:9092"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<!-- Consumer Configuration -->
<int:channel id="outputFromKafka">
<int:queue/>
</int:channel>
<bean id="zkConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
<constructor-arg ref="zookeeperConnect"/>
</bean>
<bean id="kafkaConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
<constructor-arg ref="zkConfiguration"/>
</bean>
<int-kafka:message-driven-channel-adapter id="consumerAdapter"
channel="outputFromKafka"
connection-factory="kafkaConnectionFactory"
payload-decoder="kafkaDecoder"
topics="event-stream"/>
<bean id="stringDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder"/>
<bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
<constructor-arg value="io.spring.cna.samples.escqrsdemo.pojo.MyMessage"/>
</bean>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="localhost:2181"
zk-connection-timeout="6000"
zk-session-timeout="6000"
zk-sync-time="2000" />
</beans>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment