Last active
October 9, 2018 20:17
-
-
Save BorisDaich/b5e5407c0efb010a2670ac6dfd1a464b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package biz.daich.learning.camel; | |
import org.apache.camel.builder.RouteBuilder; | |
import org.apache.camel.component.mock.MockEndpoint; | |
import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy; | |
import org.apache.camel.test.junit4.CamelTestSupport; | |
import org.junit.Test; | |
public class Test_oncompletion_agregate extends CamelTestSupport { | |
@Override | |
protected RouteBuilder createRouteBuilder() throws Exception { | |
return new TestRoute(); | |
} | |
private static class TestRoute extends RouteBuilder { | |
@Override | |
public void configure() throws Exception { | |
// @formatter:off | |
from("direct:start1") | |
.enrich("direct:enrich") | |
.end() // enrich | |
.split() | |
.tokenize(",") | |
.streaming() | |
.log("splited ${in.body}") | |
.aggregate(constant(true), new GroupedBodyAggregationStrategy()) | |
.completionSize(2) | |
.completionPredicate(header("CamelSplitComplete").isEqualTo(Boolean.TRUE)) // aggregate is done when Split is done see http://camel.apache.org/splitter.html for CamelSplitComplete | |
.log("in agregate ${in.body} and header CamelSplitComplete = ${header.CamelSplitComplete}") | |
.to("direct:agregateSink") // here you can do what ever you want to do with the aggregated messages | |
.end() // end of aggregate | |
.end() // end of split | |
.log("=== ROUTE 1 ==== DONE") | |
.to("direct:start3") | |
.to("mock:end1") | |
.stop(); | |
// @formatter:on | |
from("direct:start3") | |
.log("=========ROUTE 3 started with <${in.body}>") | |
.to("mock:end3"); | |
from("direct:enrich") | |
.log("enrich requested with <${in.body}>") | |
.transform().constant(testBody) | |
.to("mock:enrich"); | |
from("direct:agregateSink") | |
.log("GOT agregated exchanges: <${in.body}>") | |
.to("mock:agregateSink"); | |
} | |
} | |
@Test | |
public void test_0() throws Exception { | |
log.info("Test started"); | |
// now set the expectations | |
MockEndpoint end1 = getMockEndpoint("mock:end1"); | |
MockEndpoint end3 = getMockEndpoint("mock:end3"); | |
MockEndpoint agregateSink = getMockEndpoint("mock:agregateSink"); | |
assertNotNull(end1); | |
assertNotNull(end3); | |
assertNotNull(agregateSink); | |
end1.expectedMessageCount(1); | |
end3.expectedMessageCount(1); | |
agregateSink.expectedMessageCount(5); | |
// run the context | |
context.start(); | |
// poke the route 1 | |
template.sendBody("direct:start1", ""); | |
// check that all expectations on the mocks are as expected | |
assertMockEndpointsSatisfied(); | |
} | |
public static final String testBody = "aaa,bbb,ccc,ddd,eee,fff,ggg,hhh,fff"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment