Skip to content

Instantly share code, notes, and snippets.

@frickm
Last active December 26, 2015 07:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frickm/7114961 to your computer and use it in GitHub Desktop.
Save frickm/7114961 to your computer and use it in GitHub Desktop.
Self-Contained Example of a Camel-multicast that fails
package org.fricke.tests;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import static org.hamcrest.Matchers.hasSize;
/**
* Created with IntelliJ IDEA.
* User: frick
* Date: 10/2/13
* Time: 9:19 AM
*/
public class CamelMulticastTest extends CamelTestSupport {
@Test
public void testMultiRoute() throws Exception {
context.getRouteDefinition("multiroute").adviceWith(context, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
interceptSendToEndpoint("direct:d1").skipSendToOriginalEndpoint()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<String> res = Lists.newArrayList("W1");
exchange.getIn().setBody(res);
}
});
interceptSendToEndpoint("direct:d2").skipSendToOriginalEndpoint()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<String> res = Lists.newArrayList("W2");
exchange.getIn().setBody(res);
}
});
weaveAddLast().to("mock:results");
}
});
MockEndpoint ep = getMockEndpoint("mock:results");
ep.expectedMessageCount(1);
context.createProducerTemplate().sendBody("direct:test", "A");
assertMockEndpointsSatisfied();
Exchange exc = ep.getReceivedExchanges().get(0);
List<Object> l = exc.getIn().getBody(List.class);
assertThat(l, hasSize(2));
}
AggregationStrategy aggregationStrategy = new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List firstResult = newExchange.getIn().getBody(List.class);
newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
return newExchange;
} else {
List oldResults = oldExchange.getIn().getBody(List.class);
List newResults = newExchange.getIn().getBody(List.class);
ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
oldExchange.getIn().setBody(aggResult);
return oldExchange;
}
}
};
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:d1").to("log:x1");
from("direct:d2").to("log:x2");
from("direct:test")
.multicast(aggregationStrategy)
.to("direct:d1", "direct:d2")
.end()
// .log("log:xx")
// comment in to make the test succeed
.routeId("multiroute");
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment