MultiThreaded Aggregation With ErrorHandler Per Thread

  • Download Gist
GatewayMultiThreadedAggregationWithErrorHandling.md
Markdown

This example shows how to insert intermediate gateways into a flow, in order to provide a "try/catch"-like block around parts of the flow.

This is necessary when using multi-threaded aggregation with a gateway, so the result (including errors) is returned properly to the original inbound gateway.

It is important to understand that the actual services (and the error transformer) do not send their output to the aggregator directly; by omitting the output-channel, the result (or error) is returned to the intermediate gateway and thence to the aggregator.

EDIT: Added specific tests for one or both messages failing; added a resequencer so the tests are deteministic.

Sample.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package foo;
 
import java.util.ArrayList;
import java.util.List;
 
import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.message.ErrorMessage;
 
/**
* @author Gary Russell
*
*/
public class Sample {
 
public String service(Message<?> message) {
if (message.getPayload().equals("failAll") ||
(message.getHeaders().getSequenceNumber() == 1 && message.getPayload().equals("fail1")) ||
(message.getHeaders().getSequenceNumber() == 2 && message.getPayload().equals("fail2"))) {
throw new RuntimeException("expected");
}
}
 
public List<String> aggregate(List<Message<?>> messages) {
List<String> payloads = new ArrayList<String>();
for (Message<?> message : messages) {
payloads.add((String) message.getPayload());
}
return payloads;
}
 
public String errorHandler(MessagingException me) {
return "err:" + me.getFailedMessage().getPayload();
}
 
public interface MyGateway {
 
List<String> send(String text);
}
 
}
TestCase.java
Java
1 2 3 4 5 6 7 8 9 10 11
@Test
public void test() {
List<String> result = gateway.send("foo");
assertEquals("[out:foo, out:foo]", result.toString());
result = gateway.send("fail1");
assertEquals("[err:fail1, out:fail1]", result.toString());
result = gateway.send("fail2");
assertEquals("[out:fail2, err:fail2]", result.toString());
result = gateway.send("failAll");
assertEquals("[err:failAll, err:failAll]", result.toString());
}
sample-context.xml
XML
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
 
<int:gateway id="gateway"
service-interface="foo.Sample$MyGateway"
default-reply-channel="reply"
default-request-channel="input"/>
 
<int:channel id="reply" />
<!-- use a pub-sub channel to insert aggregation headers -->
 
<int:publish-subscribe-channel id="input" apply-sequence="true" task-executor="exec" />
 
<task:executor id="exec" pool-size="5" />
 
<!-- First error-handling gateway -->
<int:service-activator ref="service1gw" input-channel="input" output-channel="agg" />
 
<int:gateway id="service1gw" default-request-channel="input1" error-channel="ec" />
 
<!-- Second error-handling gateway -->
 
<int:service-activator ref="service2gw" input-channel="input" output-channel="agg" />
 
<int:gateway id="service2gw" default-request-channel="input2" error-channel="ec" />
 
<!-- Error flow ("catch" block) -->
<int:channel id="ec" />
<int:transformer input-channel="ec" ref="foo" method="errorHandler" />
<!-- Resequence so we have deterministic results for tests -->
<int:resequencer input-channel="agg" output-channel="agg2"/>
 
<!-- Aggregates returned results from the error handling gateways -->
 
<int:aggregator input-channel="agg2" ref="foo" method="aggregate" output-channel="reply" />
 
<int:channel id="input1" />
 
<int:channel id="input2" />
 
<int:service-activator id="service1" input-channel="input1"
ref="foo" method="service" />
<int:service-activator id="service2" input-channel="input2"
ref="foo" method="service" />
 
<!-- bean with methods for services and transformer -->
 
<bean id="foo" class="foo.Sample"/>
 
</beans>

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.