Skip to content

Instantly share code, notes, and snippets.

@tjayr
Last active November 22, 2015 22:28
Show Gist options
  • Save tjayr/9a1f2580a7d458ec52ce to your computer and use it in GitHub Desktop.
Save tjayr/9a1f2580a7d458ec52ce to your computer and use it in GitHub Desktop.
TestCsvRoute
package com.guru.etl.routes;
package com.guru.etl.routes;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;
import org.apache.camel.model.dataformat.CsvDataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Created by tony on 21/11/15.
*/
public class TestCsvRoute extends CamelTestSupport {
@EndpointInject(uri = "mock:result")
protected MockEndpoint resultEndpoint;
@Produce(uri = "direct:start")
protected ProducerTemplate template;
@Override
protected int getShutdownTimeout() {
return 30;
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
CsvDataFormat format = new CsvDataFormat();
format.setLazyLoad(true);
format.setUseMaps(true);
format.setSkipHeaderRecord(true);
from("direct:start").process(exchange -> exchange.getContext().startRoute("fileRoute"));
from("file:/tmp/testdata?noop=true").routeId("fileRoute")
.autoStartup(false)
.unmarshal(format)
.choice()
.when(header(Exchange.FILE_NAME).isEqualTo("test.csv")).to("direct:insert")
.otherwise().log(LoggingLevel.INFO, "dont know what to do ${body} {in.header}");
from("direct:insert")
.split(body()).streaming()
.process(exchange -> {
Map<String, Object> data = exchange.getIn().getBody(Map.class);
Map<String,Object> newData = new HashMap<String, Object>(data.size());
Set<Map.Entry<String, Object>> values = data.entrySet();
for(Map.Entry<String, Object> entry: values){
if(entry.getValue().equals("")) {
newData.put(entry.getKey(), null);
} else {
newData.put(entry.getKey(), entry.getValue());
}
}
exchange.getIn().setBody(newData);
}).
to("sql: INSERT INTO test_csv_table VALUES (:#Col1, :#Col2, :#Col3, :#Col4, STR_TO_DATE(:#Col5, '%d-%b-%y'), :#Col6)?dataSource=sourceDs")
.to("mock:result");
}
};
}
@Test
public void testRoute() throws Exception {
template.sendBody("direct:start", null);
Thread.sleep(3000);
System.out.println(resultEndpoint.getExchanges());
getMockEndpoint("mock:result").expectedMessageCount(5);
assertMockEndpointsSatisfied();
}
protected SimpleRegistry createMyRegistry() throws Exception {
SimpleRegistry registry = new SimpleRegistry();
ComboPooledDataSource source = new ComboPooledDataSource();
source.setDriverClass("com.mysql.jdbc.Driver");
source.setJdbcUrl("jdbc:mysql://localhost/gurudev");
source.setUser("root");
source.setPassword("admin");
registry.put("sourceDs", source);
registry.put("transactionManager", new StubTransactionManager());
return registry;
}
@Override
protected CamelContext createCamelContext() throws Exception {
return new DefaultCamelContext(createMyRegistry());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment