Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example CSV to JSON Apache NiFi Custom Processor and tests.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package datadidit.helpful.hints.processors.csv.converter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.naming.ConfigurationException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
@Tags({"csv", "json", "convert"})
@CapabilityDescription("Converts a CSV file into JSON.")
//@WritesAttributes({@WritesAttribute(attribute="", description="")})
//@SeeAlso({})
//@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
public class ConvertCSVToJSON extends AbstractProcessor {
private static final String APPLICATION_JSON = "application/json";
private CsvMapper csvMapper;
private CsvSchema schema;
public static final PropertyDescriptor HEADER = new PropertyDescriptor
.Builder().name("header")
.displayName("header")
.description("Whether or not a header exists in the incoming CSV file.(default true)")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor
.Builder().name("Field Names")
.displayName("Field Names")
.description("Names of the fields in the CSV if no header exists. Field names must be in order.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Successfully converted incoming CSV file to JSON")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Failed to convert incoming CSV file to JSON")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(HEADER);
descriptors.add(FIELD_NAMES);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
csvMapper = new CsvMapper();
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws ConfigurationException {
//Retrieve properties from context
Boolean header = context.getProperty(HEADER).asBoolean();
String fieldNames = context.getProperty(FIELD_NAMES).getValue();
/*
* Create Schema based on properties from user.
*/
if(!header && fieldNames!=null){
Builder build = CsvSchema.builder();
for(String field : fieldNames.split(",")){
build.addColumn(field, CsvSchema.ColumnType.NUMBER_OR_STRING);
}
schema = build.build();
}else if(header && fieldNames!=null && !fieldNames.equals("")){
schema = this.buildCsvSchema(fieldNames, header);
}else if(!header && fieldNames==null){
throw new ConfigurationException("File must either contain headers or you must provide them..");
}else{
schema = CsvSchema.emptySchema().withHeader();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
try {
//Read in Data
InputStream stream = session.read(flowFile);
String csv = IOUtils.toString(stream, "UTF-8");
stream.close();
//Convert CSV data to JSON
List<Map<?,?>> objects = this.readObjectsFromCsv(csv);
//Convert to JSON String
String json = this.writeAsJson(objects);
//Output Flowfile
FlowFile output = session.write(flowFile, new OutputStreamCallback(){
@Override
public void process(OutputStream outputStream) throws IOException {
IOUtils.write(json, outputStream, "UTF-8");
}
});
output = session.putAttribute(output, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
//TODO: May want to have a better default name....
output = session.putAttribute(output, CoreAttributes.FILENAME.key(), UUID.randomUUID().toString()+".json");
session.transfer(output, REL_SUCCESS);
} catch (IOException e) {
getLogger().error("Unable to process Change CSV to JSON for this file "+flowFile.getAttributes().get(CoreAttributes.FILENAME));
session.transfer(flowFile, REL_FAILURE);
}
}
//TODO: Make a utility library or something so that Camel version and NiFi version
//Can just use the same code
public List<Map<?,?>> readObjectsFromCsv(String fileContent) throws JsonProcessingException, IOException{
CsvMapper csvMapper = new CsvMapper();
MappingIterator<Map<?, ?>> mappingIterator = csvMapper.readerFor(Map.class).with(schema).readValues(fileContent);
return this.fixMap(mappingIterator.readAll());
}
//TODO: This is a HACK, use library or submit bug
public List<Map<?,?>> fixMap(List<Map<?,?>> map){
List<Map<?,?>> newList = new ArrayList<>();
for(Map<?, ?> entry : map){
Map<String,Object> newMap = new HashMap<String,Object>();
for(Map.Entry<?, ?> mEntry : entry.entrySet()){
String value = mEntry.getValue().toString();
//Need to remove leading . for isNumeric to work with Doubles
if(value.startsWith(".") && StringUtils.isNumeric(value.substring(1))){
newMap.put(mEntry.getKey().toString(), Double.parseDouble(value));
}else if(StringUtils.isNumeric(mEntry.getValue().toString())){
newMap.put(mEntry.getKey().toString(), Integer.parseInt(value));
}else{
newMap.put(mEntry.getKey().toString(), mEntry.getValue().toString());
}
}
newList.add(newMap);
}
return newList;
}
public String writeAsJson(List<Map<?, ?>> data) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(data);
}
//TODO: Should probably do it this way at some point....
private CsvSchema buildCsvSchema(String fieldNames, Boolean withHeader){
Builder build = CsvSchema.builder();
for(String field : fieldNames.split(",")){
String[] fieldWithType = field.split("#");
if(fieldWithType.length==2){
getLogger().info("Field: "+fieldWithType[0]);
getLogger().info("Type: "+fieldWithType[1]);
build.addColumn(fieldWithType[0], CsvSchema.ColumnType.valueOf(fieldWithType[1]));
}else{
build.addColumn(field);
}
}
if(withHeader){
return build.build().withHeader();
}
return build.build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package datadidit.helpful.hints.processors.csv.converter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class ConvertCSVToJsonTest {
private TestRunner testRunner;
@Before
public void init() {
testRunner = TestRunners.newTestRunner(ConvertCSVToJSON.class);
}
@Test
public void testWithHeader() throws FileNotFoundException, UnsupportedEncodingException {
//Set Headers
testRunner.setProperty(ConvertCSVToJSON.HEADER, "true");
testRunner.enqueue(new FileInputStream(new File("src/test/resources/WithHeader.csv")));
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ConvertCSVToJSON.REL_SUCCESS, 1);
List<MockFlowFile> successFiles = testRunner.getFlowFilesForRelationship(ConvertCSVToJSON.REL_SUCCESS);
for(MockFlowFile mockFile : successFiles){
System.out.println(new String(mockFile.toByteArray(), "UTF-8"));
}
}
@Test
public void testNoHeader() throws FileNotFoundException, UnsupportedEncodingException {
//Set Headers
testRunner.setProperty(ConvertCSVToJSON.HEADER, "false");
testRunner.setProperty(ConvertCSVToJSON.FIELD_NAMES, "firstName,lastName,dob");
testRunner.enqueue(new FileInputStream(new File("src/test/resources/NoHeader.csv")));
testRunner.run();
testRunner.assertAllFlowFilesTransferred(ConvertCSVToJSON.REL_SUCCESS, 1);
List<MockFlowFile> successFiles = testRunner.getFlowFilesForRelationship(ConvertCSVToJSON.REL_SUCCESS);
for(MockFlowFile mockFile : successFiles){
System.out.println(new String(mockFile.toByteArray(), "UTF-8"));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.