Skip to content

Instantly share code, notes, and snippets.

@tiarebalbi
Created November 24, 2015 13:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tiarebalbi/b7da5c9cb06093b75c47 to your computer and use it in GitHub Desktop.
Save tiarebalbi/b7da5c9cb06093b75c47 to your computer and use it in GitHub Desktop.
package io.xxx.config;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.BrokerAddressListConfiguration;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.integration.kafka.listener.KafkaTopicOffsetManager;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.serializer.common.StringDecoder;
import org.springframework.integration.kafka.support.KafkaProducerContext;
import org.springframework.integration.kafka.support.ProducerConfiguration;
import org.springframework.integration.kafka.support.ProducerFactoryBean;
import org.springframework.integration.kafka.support.ProducerMetadata;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import kafka.admin.AdminUtils;
import kafka.common.TopicExistsException;
import kafka.utils.ZKStringSerializer$;
/**
* Kafka Context Configuration
*
* @author Tiarê Balbi Bonamini
* @version 1.0.0
*/
@Configuration
public class KafkaConfiguration {
@Autowired
private KafkaConfigurationProperties properties;
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(producerContext());
handler.setTopicExpression(new LiteralExpression(this.properties.getTopic()));
handler.setMessageKeyExpression(new LiteralExpression(this.properties.getMessageKey()));
return handler;
}
@Bean
public KafkaProducerContext producerContext() throws Exception {
KafkaProducerContext kafkaProducerContext = new KafkaProducerContext();
ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<>(this.properties.getTopic(), String.class, String.class, new StringSerializer(), new StringSerializer());
Properties props = new Properties();
props.put("linger.ms", "1000");
ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<>(producerMetadata, this.properties.getBrokerAddress(), props);
ProducerConfiguration<String, String> config = new ProducerConfiguration<>(producerMetadata, producer.getObject());
Map<String, ProducerConfiguration<?, ?>> producerConfigurationMap = Collections.<String, ProducerConfiguration<?, ?>>singletonMap(this.properties.getTopic(), config);
kafkaProducerContext.setProducerConfigurations(producerConfigurationMap);
return kafkaProducerContext;
}
@Bean
public ConnectionFactory kafkaBrokerConnectionFactory() throws Exception {
BrokerAddressListConfiguration configuration = new BrokerAddressListConfiguration(
BrokerAddress.fromAddress(this.properties.getBrokerAddress()));
configuration.setSocketTimeout(500);
return new DefaultConnectionFactory(configuration);
}
@Bean
public OffsetManager offsetManager() {
return new KafkaTopicOffsetManager(new ZookeeperConnect(this.properties.getZookeeperConnect()), this.properties.getOffsetTopic());
}
@Bean
public KafkaMessageListenerContainer container(OffsetManager offsetManager) throws Exception {
final KafkaMessageListenerContainer kafkaMessageListenerContainer = new KafkaMessageListenerContainer(
kafkaBrokerConnectionFactory(), new Partition(this.properties.getTopic(), 0));
kafkaMessageListenerContainer.setOffsetManager(offsetManager);
kafkaMessageListenerContainer.setMaxFetch(100);
kafkaMessageListenerContainer.setConcurrency(1);
return kafkaMessageListenerContainer;
}
@Bean
public KafkaMessageDrivenChannelAdapter adapter(KafkaMessageListenerContainer container) {
KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter(container);
StringDecoder decoder = new StringDecoder();
kafkaMessageDrivenChannelAdapter.setKeyDecoder(decoder);
kafkaMessageDrivenChannelAdapter.setPayloadDecoder(decoder);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public PollableChannel received() {
return new QueueChannel();
}
@Bean
public TopicCreator topicCreator() {
return new TopicCreator(this.properties.getTopic(), this.properties.getZookeeperConnect());
}
public static class TopicCreator implements SmartLifecycle {
private final String topic;
private final String zkConnect;
private volatile boolean running;
public TopicCreator(String topic, String zkConnect) {
this.topic = topic;
this.zkConnect = zkConnect;
}
@Override
public void start() {
ZkClient client = new ZkClient(this.zkConnect, 10000, 10000, ZKStringSerializer$.MODULE$);
try {
AdminUtils.createTopic(client, this.topic, 1, 1, new Properties());
} catch (TopicExistsException e) {
}
this.running = true;
}
@Override
public void stop() {
}
@Override
public boolean isRunning() {
if (this.running) return true;
else return false;
}
@Override
public int getPhase() {
return Integer.MIN_VALUE;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
callback.run();
}
}
}
/*
* Copyright 2013-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.kafka.outbound;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.expression.IntegrationEvaluationContextAware;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.kafka.support.KafkaProducerContext;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* @author Soby Chacko
* @author Artem Bilan
* @author Gary Russell
* @since 0.5
*/
public class KafkaProducerMessageHandler extends AbstractMessageHandler
implements IntegrationEvaluationContextAware {
private final KafkaProducerContext kafkaProducerContext;
private EvaluationContext evaluationContext;
private volatile Expression topicExpression;
private volatile Expression messageKeyExpression;
private volatile Expression partitionExpression;
@SuppressWarnings("unchecked")
public KafkaProducerMessageHandler(final KafkaProducerContext kafkaProducerContext) {
this.kafkaProducerContext = kafkaProducerContext;
}
@Override
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}
public void setTopicExpression(Expression topicExpression) {
this.topicExpression = topicExpression;
}
public void setMessageKeyExpression(Expression messageKeyExpression) {
this.messageKeyExpression = messageKeyExpression;
}
public void setPartitionExpression(Expression partitionExpression) {
this.partitionExpression = partitionExpression;
}
public KafkaProducerContext getKafkaProducerContext() {
return this.kafkaProducerContext;
}
@Override
protected void onInit() throws Exception {
Assert.notNull(this.evaluationContext);
}
@Override
protected void handleMessageInternal(final Message<?> message) throws Exception {
String topic = this.topicExpression != null ?
this.topicExpression.getValue(this.evaluationContext, message, String.class)
: message.getHeaders().get(KafkaHeaders.TOPIC, String.class);
Integer partitionId = this.partitionExpression != null ?
this.partitionExpression.getValue(this.evaluationContext, message, Integer.class)
: message.getHeaders().get(KafkaHeaders.PARTITION_ID, Integer.class);
Object messageKey = this.messageKeyExpression != null
? this.messageKeyExpression.getValue(this.evaluationContext, message)
: message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);
this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
}
@Override
public String getComponentType() {
return "kafka:outbound-channel-adapter";
}
}
@tiarebalbi
Copy link
Author

Error:

file-kafkaproducermessagehandler-java-L73-L76

Caused by: java.lang.IllegalArgumentException: [Assertion failed] - this argument is required; it must not be null
at org.springframework.util.Assert.notNull(Assert.java:115)
at org.springframework.util.Assert.notNull(Assert.java:126)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.onInit(KafkaProducerMessageHandler.java:75)
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:155)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1637)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1574)
... 52 more

@tiarebalbi
Copy link
Author

My Idea here is setup a EvolutionContext on the bean:
https://gist.github.com/tiarebalbi/b7da5c9cb06093b75c47#file-kafkaconfiguration-java-L59-L66

The problem is that i don't know which should i use.

@mbogoevici
Copy link

Hi @tiarebalbi,

Looks like a mismatch between the SI version and SI Kafka.

Try using Spring Integration Kafka 1.3.0.RELEASE. There were some issues with IntegrationEvaluationContextAware and now that is handled in a different way:

https://github.com/spring-projects/spring-integration-kafka/blob/master/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

@mbogoevici
Copy link

The point is, that there should never be a requirement for you to set the EvaluationContext explicitly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment