Skip to content

Instantly share code, notes, and snippets.

@pvillard31
Created September 6, 2017 21:23
Show Gist options
  • Save pvillard31/408c6ba3a9b53880c751a35cffa9ccea to your computer and use it in GitHub Desktop.
Save pvillard31/408c6ba3a9b53880c751a35cffa9ccea to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import org.json.JSONObject;
import org.json.XML;
@SupportsBatching
@Tags({"json", "xml", "conversion"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("This processor converts XML data to JSON.")
public class XmlToJson extends AbstractProcessor {
public static final Relationship CONVERTED = new Relationship.Builder()
.name("converted")
.build();
public static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(CONVERTED);
relationships.add(FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile original = session.get();
if(original == null) {
return;
}
try {
// reading all the content of the input flow file
final byte[] byteBuffer = new byte[(int) original.getSize()];
session.read(original, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
StreamUtils.fillBuffer(in, byteBuffer, false);
}
});
// convert the content into a JSON object
final String contentString = new String(byteBuffer, 0, byteBuffer.length, Charset.forName("UTF-8"));
final JSONObject xmlJSONObj = XML.toJSONObject(contentString);
final String json = xmlJSONObj.toString();
// write the JSON data in the content of the flow file
original = session.write(original, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(json.getBytes());
}
});
original = session.putAttribute(original, "mime.type", "application/json");
session.getProvenanceReporter().modifyContent(original);
session.transfer(original, CONVERTED);
} catch (Exception e) {
session.transfer(original, FAILURE);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment