Skip to content

Instantly share code, notes, and snippets.

@ecausarano
Last active October 28, 2016 18:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ecausarano/4b66294464741b9f626890b29ea0aec2 to your computer and use it in GitHub Desktop.
Save ecausarano/4b66294464741b9f626890b29ea0aec2 to your computer and use it in GitHub Desktop.
Test multicasting to Camel pipelines
package com.esc.test;
import org.apache.camel.EndpointInject;
import org.apache.camel.Processor;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.impl.LimitedPollingConsumerPollStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.toolbox.AggregationStrategies;
import org.junit.Test;
import java.util.Arrays;
import java.util.function.Function;
import java.util.stream.Stream;
public class MulticastPipelinesTest extends CamelTestSupport {
@Produce(uri = "direct:start")
ProducerTemplate template;
@EndpointInject(uri = "mock:result")
MockEndpoint result;
Function<String, Processor> processor = (label) -> (exchange) -> {
log.debug("{} got in={}", label, exchange.getIn().getBody());
exchange.getIn().setBody(label);};
@Override
protected JndiRegistry createRegistry() throws Exception {
JndiRegistry registry = super.createRegistry();
Stream.of("A", "B", "C", "D").forEach(label ->
registry.bind(label, processor.apply(label)));
return registry;
}
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.log("after direct:start body=${body}")
.multicast().aggregationStrategy(AggregationStrategies.groupedExchange())
.pipeline().to("A").to("B").end()
.pipeline().to("C").to("D").end()
.end()
.log("before mock:result body=${body}")
.to("mock:result");
}
};
}
@Test
public void test() throws InterruptedException {
result.expectedBodyReceived().body().isNotNull();
template.sendBody("START");
result.assertIsSatisfied();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment