Created
September 6, 2017 21:23
-
-
Save pvillard31/408c6ba3a9b53880c751a35cffa9ccea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.nio.charset.Charset; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Set; | |
import org.apache.nifi.annotation.behavior.InputRequirement; | |
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; | |
import org.apache.nifi.annotation.behavior.SupportsBatching; | |
import org.apache.nifi.annotation.documentation.CapabilityDescription; | |
import org.apache.nifi.annotation.documentation.Tags; | |
import org.apache.nifi.components.PropertyDescriptor; | |
import org.apache.nifi.flowfile.FlowFile; | |
import org.apache.nifi.processor.AbstractProcessor; | |
import org.apache.nifi.processor.ProcessContext; | |
import org.apache.nifi.processor.ProcessSession; | |
import org.apache.nifi.processor.ProcessorInitializationContext; | |
import org.apache.nifi.processor.Relationship; | |
import org.apache.nifi.processor.exception.ProcessException; | |
import org.apache.nifi.processor.io.InputStreamCallback; | |
import org.apache.nifi.processor.io.OutputStreamCallback; | |
import org.apache.nifi.stream.io.StreamUtils; | |
import org.json.JSONObject; | |
import org.json.XML; | |
@SupportsBatching | |
@Tags({"json", "xml", "conversion"}) | |
@InputRequirement(Requirement.INPUT_REQUIRED) | |
@CapabilityDescription("This processor converts XML data to JSON.") | |
public class XmlToJson extends AbstractProcessor { | |
public static final Relationship CONVERTED = new Relationship.Builder() | |
.name("converted") | |
.build(); | |
public static final Relationship FAILURE = new Relationship.Builder() | |
.name("failure") | |
.build(); | |
private List<PropertyDescriptor> descriptors; | |
private Set<Relationship> relationships; | |
@Override | |
protected void init(final ProcessorInitializationContext context) { | |
final List<PropertyDescriptor> descriptors = new ArrayList<>(); | |
this.descriptors = Collections.unmodifiableList(descriptors); | |
final Set<Relationship> relationships = new HashSet<>(); | |
relationships.add(CONVERTED); | |
relationships.add(FAILURE); | |
this.relationships = Collections.unmodifiableSet(relationships); | |
} | |
@Override | |
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { | |
return descriptors; | |
} | |
@Override | |
public Set<Relationship> getRelationships() { | |
return relationships; | |
} | |
@Override | |
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { | |
FlowFile original = session.get(); | |
if(original == null) { | |
return; | |
} | |
try { | |
// reading all the content of the input flow file | |
final byte[] byteBuffer = new byte[(int) original.getSize()]; | |
session.read(original, new InputStreamCallback() { | |
@Override | |
public void process(InputStream in) throws IOException { | |
StreamUtils.fillBuffer(in, byteBuffer, false); | |
} | |
}); | |
// convert the content into a JSON object | |
final String contentString = new String(byteBuffer, 0, byteBuffer.length, Charset.forName("UTF-8")); | |
final JSONObject xmlJSONObj = XML.toJSONObject(contentString); | |
final String json = xmlJSONObj.toString(); | |
// write the JSON data in the content of the flow file | |
original = session.write(original, new OutputStreamCallback() { | |
@Override | |
public void process(final OutputStream out) throws IOException { | |
out.write(json.getBytes()); | |
} | |
}); | |
original = session.putAttribute(original, "mime.type", "application/json"); | |
session.getProvenanceReporter().modifyContent(original); | |
session.transfer(original, CONVERTED); | |
} catch (Exception e) { | |
session.transfer(original, FAILURE); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment