Skip to content

Instantly share code, notes, and snippets.

@gurpal2000
Last active August 4, 2019 09:46
Show Gist options
  • Save gurpal2000/59283b05f78fbc8fcb6b866c149ef838 to your computer and use it in GitHub Desktop.
Save gurpal2000/59283b05f78fbc8fcb6b866c149ef838 to your computer and use it in GitHub Desktop.
CamelSplitAggTimeoutTest & ArrayListAggregationStrategy
public class CamelSplitAggTimeoutTest extends CamelTestSupport {
@Test
public void test() throws Exception {
template.sendBody("direct:start", "A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z");
Thread.sleep(10_000);
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.onCompletion()
.process(exchange -> {
log.info("This will fire before the timeout #1");
})
.end()
.split(body().tokenize(",")).streaming()
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionTimeout(1_000)
.completionSize(3)
.process(exchange -> {
log.info("CamelSplitIndex: {}", exchange.getProperty("CamelSplitIndex"));
})
.end()
.end()
.process(exchange -> {
log.info("This will fire before the timeout #2");
})
;
}
};
}
}
//
public class ArrayListAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
ArrayList<Object> list = null;
if (oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
}
else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment