public
Last active

Self-Contained Example of a Camel-multicast that fails

  • Download Gist
gistfile1.java
Java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
package org.fricke.tests;
 
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import static org.hamcrest.Matchers.hasSize;
 
/**
* Created with IntelliJ IDEA.
* User: frick
* Date: 10/2/13
* Time: 9:19 AM
*/
public class CamelMulticastTest extends CamelTestSupport {
@Test
public void testMultiRoute() throws Exception {
context.getRouteDefinition("multiroute").adviceWith(context, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
interceptSendToEndpoint("direct:d1").skipSendToOriginalEndpoint()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<String> res = Lists.newArrayList("W1");
exchange.getIn().setBody(res);
}
});
interceptSendToEndpoint("direct:d2").skipSendToOriginalEndpoint()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<String> res = Lists.newArrayList("W2");
exchange.getIn().setBody(res);
}
});
weaveAddLast().to("mock:results");
}
});
 
MockEndpoint ep = getMockEndpoint("mock:results");
ep.expectedMessageCount(1);
 
context.createProducerTemplate().sendBody("direct:test", "A");
assertMockEndpointsSatisfied();
 
Exchange exc = ep.getReceivedExchanges().get(0);
List<Object> l = exc.getIn().getBody(List.class);
assertThat(l, hasSize(2));
}
 
AggregationStrategy aggregationStrategy = new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List firstResult = newExchange.getIn().getBody(List.class);
newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
return newExchange;
} else {
List oldResults = oldExchange.getIn().getBody(List.class);
List newResults = newExchange.getIn().getBody(List.class);
ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
oldExchange.getIn().setBody(aggResult);
return oldExchange;
}
}
};
 
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
 
from("direct:d1").to("log:x1");
from("direct:d2").to("log:x2");
 
from("direct:test")
.multicast(aggregationStrategy)
.to("direct:d1", "direct:d2")
.end()
// .log("log:xx")
// comment in to make the test succeed
.routeId("multiroute");
 
}
};
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.