Skip to content

Instantly share code, notes, and snippets.

@likhoman
Created June 27, 2020 06:29
Show Gist options
  • Save likhoman/3eb818bfd136762ce47a9c55000a1c65 to your computer and use it in GitHub Desktop.
Save likhoman/3eb818bfd136762ce47a9c55000a1c65 to your computer and use it in GitHub Desktop.
package org.github.likhoman.mtls.client.ws;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer;
import org.springframework.integration.stomp.ReactorNettyTcpStompSessionManager;
import org.springframework.integration.stomp.StompSessionManager;
import org.springframework.integration.stomp.event.StompIntegrationEvent;
import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter;
import org.springframework.integration.stomp.outbound.StompMessageHandler;
import org.springframework.integration.support.converter.PassThruMessageConverter;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.simp.stomp.ReactorNettyTcpStompClient;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
@EnableIntegration
public class StompConfiguration {
@Value("${stomp.host:127.0.0.1}")
private String stompHost;
@Value("${stomp.port:61613}")
private int stompPort;
@Value("${stomp.topic}")
private String topic;
@Bean
public ReactorNettyTcpStompClient stompClient() {
var stompClient = new ReactorNettyTcpStompClient(stompHost, stompPort);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
var adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/" + topic);
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
var handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/" + topic);
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public PollableChannel stompErrors() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
var producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
producer.setErrorChannel(stompErrors());
return producer;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment