Skip to content

Instantly share code, notes, and snippets.

@suikast42
Last active January 26, 2021 07:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suikast42/367f35f8d27071d616d70803dd41e385 to your computer and use it in GitHub Desktop.
Save suikast42/367f35f8d27071d616d70803dd41e385 to your computer and use it in GitHub Desktop.
Spring Boot -> Kafka Stream -> TimeScaleDB over JTA
Create a standard JTA based Springboot project so that your servives supports @ Transactional
I avoid to using JPA entities here. The JPA specification has the limitation of ID's. So every enitity that your mapped must have one.
package com.boxbay.wms.bi.batch;
import com.boxbay.wms.bi.configuartion.serde.DeviceMovesSerde;
import com.boxbay.wms.bi.configuartion.topic.TopicConfiguration;
import com.boxbay.wms.bi.model.DeviceMoveTracking;
import com.boxbay.wms.commons.datetime.DateTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author: vuru
* Date: 30.09.2020
* Time: 06:33
*/
@SuppressWarnings("SqlDialectInspection")
@Repository
@Transactional
public class KafkaToTimeScaleBatchingService {
@Autowired
private JdbcTemplate jdbcTemplate;
private final String QUERY = """
<your query>
""";
public int[] batchInsert(final List<YourModel> moves) {
return this.jdbcTemplate.batchUpdate(QUERY,
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
<Your value sets>
}
@Override
public int getBatchSize() {
return moves.size();
}
});
}
@KafkaListener(containerFactory = "yourContainerFactory",
id = "yourID",
clientIdPrefix = "yourPref",
topics = "yourTopics",
autoStartup = "true",
groupId = "yourGrp"
)
@Transactional
public void processMaterialTracking(List<ConsumerRecord<String, YourModel>> record) {
try {
batchInsert(record.stream().map(ConsumerRecord::value).collect(Collectors.toList()));
} catch (Exception e) {
<hanlde exp for doing a rollback or not a rollback should deliver the same dataset again>
}
}
@Bean("yourContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, YourModel>>
deviceMovesListener(
@Qualifier("yourConsumerFactory") ConsumerFactory<String, YourModel> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, DeviceMoveTracking> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(5);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean("yourConsumerFactory")
public ConsumerFactory<String, DeviceMoveTracking> consumerFactoryDeviceMoves() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, YourModelSerde.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_DOC, "read_committed");
return new DefaultKafkaConsumerFactory<>(props);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment