Created
November 24, 2015 13:34
-
-
Save tiarebalbi/b7da5c9cb06093b75c47 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.xxx.config; | |
import java.util.Collections; | |
import java.util.Map; | |
import java.util.Properties; | |
import org.I0Itec.zkclient.ZkClient; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.boot.builder.SpringApplicationBuilder; | |
import org.springframework.context.ConfigurableApplicationContext; | |
import org.springframework.context.SmartLifecycle; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.expression.common.LiteralExpression; | |
import org.springframework.integration.annotation.ServiceActivator; | |
import org.springframework.integration.channel.QueueChannel; | |
import org.springframework.integration.kafka.core.BrokerAddress; | |
import org.springframework.integration.kafka.core.BrokerAddressListConfiguration; | |
import org.springframework.integration.kafka.core.ConnectionFactory; | |
import org.springframework.integration.kafka.core.DefaultConnectionFactory; | |
import org.springframework.integration.kafka.core.Partition; | |
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; | |
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer; | |
import org.springframework.integration.kafka.listener.KafkaTopicOffsetManager; | |
import org.springframework.integration.kafka.listener.OffsetManager; | |
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; | |
import org.springframework.integration.kafka.serializer.common.StringDecoder; | |
import org.springframework.integration.kafka.support.KafkaProducerContext; | |
import org.springframework.integration.kafka.support.ProducerConfiguration; | |
import org.springframework.integration.kafka.support.ProducerFactoryBean; | |
import org.springframework.integration.kafka.support.ProducerMetadata; | |
import org.springframework.integration.kafka.support.ZookeeperConnect; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.MessageChannel; | |
import org.springframework.messaging.MessageHandler; | |
import org.springframework.messaging.PollableChannel; | |
import org.springframework.messaging.support.GenericMessage; | |
import kafka.admin.AdminUtils; | |
import kafka.common.TopicExistsException; | |
import kafka.utils.ZKStringSerializer$; | |
/** | |
* Kafka Context Configuration | |
* | |
* @author Tiarê Balbi Bonamini | |
* @version 1.0.0 | |
*/ | |
@Configuration | |
public class KafkaConfiguration { | |
@Autowired | |
private KafkaConfigurationProperties properties; | |
@Bean | |
@ServiceActivator(inputChannel = "toKafka") | |
public MessageHandler handler() throws Exception { | |
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(producerContext()); | |
handler.setTopicExpression(new LiteralExpression(this.properties.getTopic())); | |
handler.setMessageKeyExpression(new LiteralExpression(this.properties.getMessageKey())); | |
return handler; | |
} | |
@Bean | |
public KafkaProducerContext producerContext() throws Exception { | |
KafkaProducerContext kafkaProducerContext = new KafkaProducerContext(); | |
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<>(this.properties.getTopic(), String.class, String.class, new StringSerializer(), new StringSerializer()); | |
Properties props = new Properties(); | |
props.put("linger.ms", "1000"); | |
ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<>(producerMetadata, this.properties.getBrokerAddress(), props); | |
ProducerConfiguration<String, String> config = new ProducerConfiguration<>(producerMetadata, producer.getObject()); | |
Map<String, ProducerConfiguration<?, ?>> producerConfigurationMap = Collections.<String, ProducerConfiguration<?, ?>>singletonMap(this.properties.getTopic(), config); | |
kafkaProducerContext.setProducerConfigurations(producerConfigurationMap); | |
return kafkaProducerContext; | |
} | |
@Bean | |
public ConnectionFactory kafkaBrokerConnectionFactory() throws Exception { | |
BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration( | |
BrokerAddress.fromAddress(this.properties.getBrokerAddress())); | |
configuration.setSocketTimeout(500); | |
return new DefaultConnectionFactory(configuration); | |
} | |
@Bean | |
public OffsetManager offsetManager() { | |
return new KafkaTopicOffsetManager(new ZookeeperConnect(this.properties.getZookeeperConnect()), this.properties.getOffsetTopic()); | |
} | |
@Bean | |
public KafkaMessageListenerContainer container(OffsetManager offsetManager) throws Exception { | |
final KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer( | |
kafkaBrokerConnectionFactory(), new Partition(this.properties.getTopic(), 0)); | |
kafkaMessageListenerContainer.setOffsetManager(offsetManager); | |
kafkaMessageListenerContainer.setMaxFetch(100); | |
kafkaMessageListenerContainer.setConcurrency(1); | |
return kafkaMessageListenerContainer; | |
} | |
@Bean | |
public KafkaMessageDrivenChannelAdapter adapter(KafkaMessageListenerContainer container) { | |
KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = | |
new KafkaMessageDrivenChannelAdapter(container); | |
StringDecoder decoder = new StringDecoder(); | |
kafkaMessageDrivenChannelAdapter.setKeyDecoder(decoder); | |
kafkaMessageDrivenChannelAdapter.setPayloadDecoder(decoder); | |
kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); | |
return kafkaMessageDrivenChannelAdapter; | |
} | |
@Bean | |
public PollableChannel received() { | |
return new QueueChannel(); | |
} | |
@Bean | |
public TopicCreator topicCreator() { | |
return new TopicCreator(this.properties.getTopic(), this.properties.getZookeeperConnect()); | |
} | |
public static class TopicCreator implements SmartLifecycle { | |
private final String topic; | |
private final String zkConnect; | |
private volatile boolean running; | |
public TopicCreator(String topic, String zkConnect) { | |
this.topic = topic; | |
this.zkConnect = zkConnect; | |
} | |
@Override | |
public void start() { | |
ZkClient client = new ZkClient(this.zkConnect, 10000, 10000, ZKStringSerializer$.MODULE$); | |
try { | |
AdminUtils.createTopic(client, this.topic, 1, 1, new Properties()); | |
} catch (TopicExistsException e) { | |
} | |
this.running = true; | |
} | |
@Override | |
public void stop() { | |
} | |
@Override | |
public boolean isRunning() { | |
if (this.running) return true; | |
else return false; | |
} | |
@Override | |
public int getPhase() { | |
return Integer.MIN_VALUE; | |
} | |
@Override | |
public boolean isAutoStartup() { | |
return true; | |
} | |
@Override | |
public void stop(Runnable callback) { | |
callback.run(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2013-2014 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.springframework.integration.kafka.outbound; | |
import org.springframework.expression.EvaluationContext; | |
import org.springframework.expression.Expression; | |
import org.springframework.integration.expression.IntegrationEvaluationContextAware; | |
import org.springframework.integration.handler.AbstractMessageHandler; | |
import org.springframework.integration.kafka.support.KafkaHeaders; | |
import org.springframework.integration.kafka.support.KafkaProducerContext; | |
import org.springframework.messaging.Message; | |
import org.springframework.util.Assert; | |
/** | |
* @author Soby Chacko | |
* @author Artem Bilan | |
* @author Gary Russell | |
* @since 0.5 | |
*/ | |
public class KafkaProducerMessageHandler extends AbstractMessageHandler | |
implements IntegrationEvaluationContextAware { | |
private final KafkaProducerContext kafkaProducerContext; | |
private EvaluationContext evaluationContext; | |
private volatile Expression topicExpression; | |
private volatile Expression messageKeyExpression; | |
private volatile Expression partitionExpression; | |
@SuppressWarnings("unchecked") | |
public KafkaProducerMessageHandler(final KafkaProducerContext kafkaProducerContext) { | |
this.kafkaProducerContext = kafkaProducerContext; | |
} | |
@Override | |
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) { | |
this.evaluationContext = evaluationContext; | |
} | |
public void setTopicExpression(Expression topicExpression) { | |
this.topicExpression = topicExpression; | |
} | |
public void setMessageKeyExpression(Expression messageKeyExpression) { | |
this.messageKeyExpression = messageKeyExpression; | |
} | |
public void setPartitionExpression(Expression partitionExpression) { | |
this.partitionExpression = partitionExpression; | |
} | |
public KafkaProducerContext getKafkaProducerContext() { | |
return this.kafkaProducerContext; | |
} | |
@Override | |
protected void onInit() throws Exception { | |
Assert.notNull(this.evaluationContext); | |
} | |
@Override | |
protected void handleMessageInternal(final Message<?> message) throws Exception { | |
String topic = this.topicExpression != null ? | |
this.topicExpression.getValue(this.evaluationContext, message, String.class) | |
: message.getHeaders().get(KafkaHeaders.TOPIC, String.class); | |
Integer partitionId = this.partitionExpression != null ? | |
this.partitionExpression.getValue(this.evaluationContext, message, Integer.class) | |
: message.getHeaders().get(KafkaHeaders.PARTITION_ID, Integer.class); | |
Object messageKey = this.messageKeyExpression != null | |
? this.messageKeyExpression.getValue(this.evaluationContext, message) | |
: message.getHeaders().get(KafkaHeaders.MESSAGE_KEY); | |
this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload()); | |
} | |
@Override | |
public String getComponentType() { | |
return "kafka:outbound-channel-adapter"; | |
} | |
} |
My Idea here is setup a EvolutionContext on the bean:
https://gist.github.com/tiarebalbi/b7da5c9cb06093b75c47#file-kafkaconfiguration-java-L59-L66
The problem is that i don't know which should i use.
Hi @tiarebalbi,
Looks like a mismatch between the SI version and SI Kafka.
Try using Spring Integration Kafka 1.3.0.RELEASE. There were some issues with IntegrationEvaluationContextAware
and now that is handled in a different way:
The point is, that there should never be a requirement for you to set the EvaluationContext
explicitly.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Error:
file-kafkaproducermessagehandler-java-L73-L76
Caused by: java.lang.IllegalArgumentException: [Assertion failed] - this argument is required; it must not be null
at org.springframework.util.Assert.notNull(Assert.java:115)
at org.springframework.util.Assert.notNull(Assert.java:126)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.onInit(KafkaProducerMessageHandler.java:75)
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:155)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1637)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574)
... 52 more