Last active
November 22, 2015 22:28
-
-
Save tjayr/9a1f2580a7d458ec52ce to your computer and use it in GitHub Desktop.
TestCsvRoute
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.guru.etl.routes; | |
package com.guru.etl.routes; | |
import com.mchange.v2.c3p0.ComboPooledDataSource; | |
import org.apache.camel.*; | |
import org.apache.camel.builder.RouteBuilder; | |
import org.apache.camel.component.mock.MockEndpoint; | |
import org.apache.camel.impl.DefaultCamelContext; | |
import org.apache.camel.impl.SimpleRegistry; | |
import org.apache.camel.model.dataformat.CsvDataFormat; | |
import org.apache.camel.test.junit4.CamelTestSupport; | |
import org.junit.Test; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Set; | |
/** | |
* Created by tony on 21/11/15. | |
*/ | |
public class TestCsvRoute extends CamelTestSupport { | |
@EndpointInject(uri = "mock:result") | |
protected MockEndpoint resultEndpoint; | |
@Produce(uri = "direct:start") | |
protected ProducerTemplate template; | |
@Override | |
protected int getShutdownTimeout() { | |
return 30; | |
} | |
@Override | |
protected RouteBuilder createRouteBuilder() throws Exception { | |
return new RouteBuilder() { | |
@Override | |
public void configure() throws Exception { | |
CsvDataFormat format = new CsvDataFormat(); | |
format.setLazyLoad(true); | |
format.setUseMaps(true); | |
format.setSkipHeaderRecord(true); | |
from("direct:start").process(exchange -> exchange.getContext().startRoute("fileRoute")); | |
from("file:/tmp/testdata?noop=true").routeId("fileRoute") | |
.autoStartup(false) | |
.unmarshal(format) | |
.choice() | |
.when(header(Exchange.FILE_NAME).isEqualTo("test.csv")).to("direct:insert") | |
.otherwise().log(LoggingLevel.INFO, "dont know what to do ${body} {in.header}"); | |
from("direct:insert") | |
.split(body()).streaming() | |
.process(exchange -> { | |
Map<String, Object> data = exchange.getIn().getBody(Map.class); | |
Map<String,Object> newData = new HashMap<String, Object>(data.size()); | |
Set<Map.Entry<String, Object>> values = data.entrySet(); | |
for(Map.Entry<String, Object> entry: values){ | |
if(entry.getValue().equals("")) { | |
newData.put(entry.getKey(), null); | |
} else { | |
newData.put(entry.getKey(), entry.getValue()); | |
} | |
} | |
exchange.getIn().setBody(newData); | |
}). | |
to("sql: INSERT INTO test_csv_table VALUES (:#Col1, :#Col2, :#Col3, :#Col4, STR_TO_DATE(:#Col5, '%d-%b-%y'), :#Col6)?dataSource=sourceDs") | |
.to("mock:result"); | |
} | |
}; | |
} | |
@Test | |
public void testRoute() throws Exception { | |
template.sendBody("direct:start", null); | |
Thread.sleep(3000); | |
System.out.println(resultEndpoint.getExchanges()); | |
getMockEndpoint("mock:result").expectedMessageCount(5); | |
assertMockEndpointsSatisfied(); | |
} | |
protected SimpleRegistry createMyRegistry() throws Exception { | |
SimpleRegistry registry = new SimpleRegistry(); | |
ComboPooledDataSource source = new ComboPooledDataSource(); | |
source.setDriverClass("com.mysql.jdbc.Driver"); | |
source.setJdbcUrl("jdbc:mysql://localhost/gurudev"); | |
source.setUser("root"); | |
source.setPassword("admin"); | |
registry.put("sourceDs", source); | |
registry.put("transactionManager", new StubTransactionManager()); | |
return registry; | |
} | |
@Override | |
protected CamelContext createCamelContext() throws Exception { | |
return new DefaultCamelContext(createMyRegistry()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment