Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save garyrussell/5510524 to your computer and use it in GitHub Desktop.
Save garyrussell/5510524 to your computer and use it in GitHub Desktop.
MultiThreaded Aggregation With ErrorHandler Per Thread

This example shows how to insert intermediate gateways into a flow, in order to provide a "try/catch"-like block around parts of the flow.

This is necessary when using multi-threaded aggregation with a gateway, so the result (including errors) is returned properly to the original inbound gateway.

It is important to understand that the actual services (and the error transformer) do not send their output to the aggregator directly; by omitting the output-channel, the result (or error) is returned to the intermediate gateway and thence to the aggregator.

EDIT: Added specific tests for one or both messages failing; added a resequencer so the tests are deteministic.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<int:gateway id="gateway"
service-interface="foo.Sample$MyGateway"
default-reply-channel="reply"
default-request-channel="input"/>
<int:channel id="reply" />
<!-- use a pub-sub channel to insert aggregation headers -->
<int:publish-subscribe-channel id="input" apply-sequence="true" task-executor="exec" />
<task:executor id="exec" pool-size="5" />
<!-- First error-handling gateway -->
<int:service-activator ref="service1gw" input-channel="input" output-channel="agg" />
<int:gateway id="service1gw" default-request-channel="input1" error-channel="ec" />
<!-- Second error-handling gateway -->
<int:service-activator ref="service2gw" input-channel="input" output-channel="agg" />
<int:gateway id="service2gw" default-request-channel="input2" error-channel="ec" />
<!-- Error flow ("catch" block) -->
<int:channel id="ec" />
<int:transformer input-channel="ec" ref="foo" method="errorHandler" />
<!-- Resequence so we have deterministic results for tests -->
<int:resequencer input-channel="agg" output-channel="agg2"/>
<!-- Aggregates returned results from the error handling gateways -->
<int:aggregator input-channel="agg2" ref="foo" method="aggregate" output-channel="reply" />
<int:channel id="input1" />
<int:channel id="input2" />
<int:service-activator id="service1" input-channel="input1"
ref="foo" method="service" />
<int:service-activator id="service2" input-channel="input2"
ref="foo" method="service" />
<!-- bean with methods for services and transformer -->
<bean id="foo" class="foo.Sample"/>
</beans>
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package foo;
import java.util.ArrayList;
import java.util.List;
import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.message.ErrorMessage;
/**
* @author Gary Russell
*
*/
public class Sample {
public String service(Message<?> message) {
if (message.getPayload().equals("failAll") ||
(message.getHeaders().getSequenceNumber() == 1 && message.getPayload().equals("fail1")) ||
(message.getHeaders().getSequenceNumber() == 2 && message.getPayload().equals("fail2"))) {
throw new RuntimeException("expected");
}
}
public List<String> aggregate(List<Message<?>> messages) {
List<String> payloads = new ArrayList<String>();
for (Message<?> message : messages) {
payloads.add((String) message.getPayload());
}
return payloads;
}
public String errorHandler(MessagingException me) {
return "err:" + me.getFailedMessage().getPayload();
}
public interface MyGateway {
List<String> send(String text);
}
}
@Test
public void test() {
List<String> result = gateway.send("foo");
assertEquals("[out:foo, out:foo]", result.toString());
result = gateway.send("fail1");
assertEquals("[err:fail1, out:fail1]", result.toString());
result = gateway.send("fail2");
assertEquals("[out:fail2, err:fail2]", result.toString());
result = gateway.send("failAll");
assertEquals("[err:failAll, err:failAll]", result.toString());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment