Skip to content

Instantly share code, notes, and snippets.

@BorisDaich
Last active October 9, 2018 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BorisDaich/b5e5407c0efb010a2670ac6dfd1a464b to your computer and use it in GitHub Desktop.
Save BorisDaich/b5e5407c0efb010a2670ac6dfd1a464b to your computer and use it in GitHub Desktop.
package biz.daich.learning.camel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
public class Test_oncompletion_agregate extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new TestRoute();
}
private static class TestRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// @formatter:off
from("direct:start1")
.enrich("direct:enrich")
.end() // enrich
.split()
.tokenize(",")
.streaming()
.log("splited ${in.body}")
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(2)
.completionPredicate(header("CamelSplitComplete").isEqualTo(Boolean.TRUE)) // aggregate is done when Split is done see http://camel.apache.org/splitter.html for CamelSplitComplete
.log("in agregate ${in.body} and header CamelSplitComplete = ${header.CamelSplitComplete}")
.to("direct:agregateSink") // here you can do what ever you want to do with the aggregated messages
.end() // end of aggregate
.end() // end of split
.log("=== ROUTE 1 ==== DONE")
.to("direct:start3")
.to("mock:end1")
.stop();
// @formatter:on
from("direct:start3")
.log("=========ROUTE 3 started with <${in.body}>")
.to("mock:end3");
from("direct:enrich")
.log("enrich requested with <${in.body}>")
.transform().constant(testBody)
.to("mock:enrich");
from("direct:agregateSink")
.log("GOT agregated exchanges: <${in.body}>")
.to("mock:agregateSink");
}
}
@Test
public void test_0() throws Exception {
log.info("Test started");
// now set the expectations
MockEndpoint end1 = getMockEndpoint("mock:end1");
MockEndpoint end3 = getMockEndpoint("mock:end3");
MockEndpoint agregateSink = getMockEndpoint("mock:agregateSink");
assertNotNull(end1);
assertNotNull(end3);
assertNotNull(agregateSink);
end1.expectedMessageCount(1);
end3.expectedMessageCount(1);
agregateSink.expectedMessageCount(5);
// run the context
context.start();
// poke the route 1
template.sendBody("direct:start1", "");
// check that all expectations on the mocks are as expected
assertMockEndpointsSatisfied();
}
public static final String testBody = "aaa,bbb,ccc,ddd,eee,fff,ggg,hhh,fff";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment