Skip to content

Instantly share code, notes, and snippets.

@bedlaj
Last active May 11, 2019 16:25
Show Gist options
  • Save bedlaj/a2a56aa9291bced8c0a8edebacaf22b0 to your computer and use it in GitHub Desktop.
Save bedlaj/a2a56aa9291bced8c0a8edebacaf22b0 to your computer and use it in GitHub Desktop.
package eu.janbednar.stackoverflow;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.toolbox.AggregationStrategies;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.List;
public class AggregatorCorrelationExpressionTest extends CamelTestSupport {
private final Expression CORRELATION_EXPRESSION = new Expression() {
@Override
public <T> T evaluate(Exchange exchange, Class<T> type) {
final String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
final String correlationExpression = fileName.substring(0, fileName.indexOf('_'));
return exchange.getContext().getTypeConverter().convertTo(
type,
correlationExpression
);
}
};
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("file:inputDirectory")
.aggregate(CORRELATION_EXPRESSION, AggregationStrategies.groupedExchange())
.completionSize(2)//Wait for two files
.completionTimeout(60000)//Or process single file, if completionSize was not fullified within one minute
.to("log:do_something")
.to("mock:result")//Here you can do anything. You have here List<Exchange> in message body
;
}
};
}
@Test
public void testCorrelationExpression() throws Exception{
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedMessageCount(2);
result.setResultWaitTime(20000);
Assert.assertTrue(new File("inputDirectory", "group1_1.txt").createNewFile());
Assert.assertTrue(new File("inputDirectory", "group2_1.txt").createNewFile());
Assert.assertTrue(new File("inputDirectory", "group1_2.txt").createNewFile());
Assert.assertTrue(new File("inputDirectory", "group2_2.txt").createNewFile());
result.assertIsSatisfied();
Exchange group1 = result.getReceivedExchanges().get(0);
Exchange group2 = result.getReceivedExchanges().get(1);
Assert.assertEquals(2, group1.getIn().getBody(List.class).size());
Assert.assertEquals(2, group2.getIn().getBody(List.class).size());
if (((List<Exchange>)group1.getIn().getBody(List.class))
.get(0).getIn().getHeader(Exchange.FILE_NAME, String.class)
.startsWith("group1")) {
Assert.assertTrue(
((List<Exchange>)group1.getIn().getBody(List.class)).stream().allMatch(
exchange -> exchange.getIn().getHeader(Exchange.FILE_NAME, String.class).startsWith("group1")
)
);
}
if (((List<Exchange>)group1.getIn().getBody(List.class))
.get(0).getIn().getHeader(Exchange.FILE_NAME, String.class)
.startsWith("group2")) {
Assert.assertTrue(
((List<Exchange>)group1.getIn().getBody(List.class)).stream().allMatch(
exchange -> exchange.getIn().getHeader(Exchange.FILE_NAME, String.class).startsWith("group2")
)
);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment