Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Last active July 14, 2019 19:17
Show Gist options
  • Save garyrussell/a2548d5cff87f438df41ca12e6438fd0 to your computer and use it in GitHub Desktop.
Save garyrussell/a2548d5cff87f438df41ca12e6438fd0 to your computer and use it in GitHub Desktop.
SO36252276
package com.example;
import java.util.concurrent.Executor;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@SpringBootApplication
@IntegrationComponentScan
public class So36252276Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(So36252276Application.class)
.web(false)
.run(args);
Gate gateway = context.getBean(Gate.class);
System.out.println(gateway.exchange("good"));
try {
gateway.exchange("fail1"); // sync fail
}
catch (Exception e) {
System.out.println("++++ " + e.getMessage());
}
try {
gateway.exchange("fail2"); // async fail, first leg
}
catch (Exception e) {
System.out.println("++++ " + e.getMessage());
}
System.out.println(gateway.exchange("fail3")); // async fail second leg - not propagated here
context.close();
}
@Bean
public IntegrationFlow main() {
return IntegrationFlows.from(MessageChannels.direct("start"))
.handle("handlers", "first")
.channel(MessageChannels.publishSubscribe("pubsub", exec()))
.get();
}
@Bean
public IntegrationFlow pubsub1() { // first subscriber - his result goes to gateway
return IntegrationFlows.from("pubsub")
.handle("handlers", "second")
.get();
}
@Bean
public IntegrationFlow pubsub2() { // second subscriber
return IntegrationFlows.from("pubsub")
.enrichHeaders(s -> s.header(MessageHeaders.ERROR_CHANNEL, "errors", true))
.handle("handlers", "third")
.handle("handlers", "fourth")
.get();
}
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from("errors")
.handle("handlers", "errors")
.get();
}
@Bean
public Handlers handlers() {
return new Handlers();
}
@Bean
public Executor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@MessagingGateway(defaultRequestChannel="start", errorChannel="errors")
public interface Gate {
public String exchange(String foo);
}
public static class Handlers {
public String first(String in) {
System.out.println(Thread.currentThread().getName() + " first:" + in);
if (in.equals("fail1")) {
throw new RuntimeException("failed in 1");
}
else {
return in+in;
}
}
public String second(String in) {
System.out.println(Thread.currentThread().getName() + " second:" + in);
if (in.equals("fail2fail2")) {
throw new RuntimeException("failed in 2");
}
else {
return in+in;
}
}
public String third(String in) {
System.out.println(Thread.currentThread().getName() + " third:" + in);
if (in.equals("fail3fail3")) {
throw new RuntimeException("failed in 3");
}
else {
return in+in;
}
}
public void fourth(String in) {
System.out.println(Thread.currentThread().getName() + " fourth:" + in);
}
public void errors(ErrorMessage in) {
System.out.println(Thread.currentThread().getName() + " " + in);
MessagingException exception = (MessagingException) in.getPayload();
if (exception.getFailedMessage().getHeaders().getErrorChannel() != "errors") {
throw new RuntimeException("Handled error for " + in);
}
else {
System.out.println("Exception occurred on secondary async flow");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment