Skip to content

Instantly share code, notes, and snippets.

@gurpal2000
Created August 4, 2019 14:02
Show Gist options
  • Save gurpal2000/c641c61b6146e10a93e597afb956272c to your computer and use it in GitHub Desktop.
Save gurpal2000/c641c61b6146e10a93e597afb956272c to your computer and use it in GitHub Desktop.
CamelSplitAggTimeoutTest & ArrayListAggregationStrategy - take #2
public class CamelSplitAggTimeoutTest extends CamelTestSupport {
@Test
public void test() throws Exception {
template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z");
Thread.sleep(10_000);
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.onCompletion()
.process(exchange -> {
log.info("We should be done #1");
})
.end()
.split(body().tokenize(",")).streaming()
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionSize(3)
.completionPredicate(header("CamelSplitComplete"))
.eagerCheckCompletion()
.process(exchange -> {
log.info("batch content: {}", exchange.getIn().getBody());
})
.end()
.end()
.process(exchange -> {
log.info("We should be done #2");
})
;
}
};
}
}
//
public class ArrayListAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
ArrayList<Object> list = null;
if (oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
}
else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment