Last active
July 14, 2019 19:17
-
-
Save garyrussell/a2548d5cff87f438df41ca12e6438fd0 to your computer and use it in GitHub Desktop.
SO36252276
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import java.util.concurrent.Executor; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.boot.builder.SpringApplicationBuilder; | |
import org.springframework.context.ConfigurableApplicationContext; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.integration.annotation.IntegrationComponentScan; | |
import org.springframework.integration.annotation.MessagingGateway; | |
import org.springframework.integration.dsl.IntegrationFlow; | |
import org.springframework.integration.dsl.IntegrationFlows; | |
import org.springframework.integration.dsl.channel.MessageChannels; | |
import org.springframework.messaging.MessageHeaders; | |
import org.springframework.messaging.MessagingException; | |
import org.springframework.messaging.support.ErrorMessage; | |
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | |
@SpringBootApplication | |
@IntegrationComponentScan | |
public class So36252276Application { | |
public static void main(String[] args) { | |
ConfigurableApplicationContext context = new SpringApplicationBuilder(So36252276Application.class) | |
.web(false) | |
.run(args); | |
Gate gateway = context.getBean(Gate.class); | |
System.out.println(gateway.exchange("good")); | |
try { | |
gateway.exchange("fail1"); // sync fail | |
} | |
catch (Exception e) { | |
System.out.println("++++ " + e.getMessage()); | |
} | |
try { | |
gateway.exchange("fail2"); // async fail, first leg | |
} | |
catch (Exception e) { | |
System.out.println("++++ " + e.getMessage()); | |
} | |
System.out.println(gateway.exchange("fail3")); // async fail second leg - not propagated here | |
context.close(); | |
} | |
@Bean | |
public IntegrationFlow main() { | |
return IntegrationFlows.from(MessageChannels.direct("start")) | |
.handle("handlers", "first") | |
.channel(MessageChannels.publishSubscribe("pubsub", exec())) | |
.get(); | |
} | |
@Bean | |
public IntegrationFlow pubsub1() { // first subscriber - his result goes to gateway | |
return IntegrationFlows.from("pubsub") | |
.handle("handlers", "second") | |
.get(); | |
} | |
@Bean | |
public IntegrationFlow pubsub2() { // second subscriber | |
return IntegrationFlows.from("pubsub") | |
.enrichHeaders(s -> s.header(MessageHeaders.ERROR_CHANNEL, "errors", true)) | |
.handle("handlers", "third") | |
.handle("handlers", "fourth") | |
.get(); | |
} | |
@Bean | |
public IntegrationFlow errorFlow() { | |
return IntegrationFlows.from("errors") | |
.handle("handlers", "errors") | |
.get(); | |
} | |
@Bean | |
public Handlers handlers() { | |
return new Handlers(); | |
} | |
@Bean | |
public Executor exec() { | |
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); | |
exec.setCorePoolSize(10); | |
return exec; | |
} | |
@MessagingGateway(defaultRequestChannel="start", errorChannel="errors") | |
public interface Gate { | |
public String exchange(String foo); | |
} | |
public static class Handlers { | |
public String first(String in) { | |
System.out.println(Thread.currentThread().getName() + " first:" + in); | |
if (in.equals("fail1")) { | |
throw new RuntimeException("failed in 1"); | |
} | |
else { | |
return in+in; | |
} | |
} | |
public String second(String in) { | |
System.out.println(Thread.currentThread().getName() + " second:" + in); | |
if (in.equals("fail2fail2")) { | |
throw new RuntimeException("failed in 2"); | |
} | |
else { | |
return in+in; | |
} | |
} | |
public String third(String in) { | |
System.out.println(Thread.currentThread().getName() + " third:" + in); | |
if (in.equals("fail3fail3")) { | |
throw new RuntimeException("failed in 3"); | |
} | |
else { | |
return in+in; | |
} | |
} | |
public void fourth(String in) { | |
System.out.println(Thread.currentThread().getName() + " fourth:" + in); | |
} | |
public void errors(ErrorMessage in) { | |
System.out.println(Thread.currentThread().getName() + " " + in); | |
MessagingException exception = (MessagingException) in.getPayload(); | |
if (exception.getFailedMessage().getHeaders().getErrorChannel() != "errors") { | |
throw new RuntimeException("Handled error for " + in); | |
} | |
else { | |
System.out.println("Exception occurred on secondary async flow"); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment