Skip to content

Instantly share code, notes, and snippets.

Created April 19, 2018 12:34
Show Gist options
  • Save dgrahn/37c2b40e1b7d460fcc8e0bc6c99a9e33 to your computer and use it in GitHub Desktop.
Save dgrahn/37c2b40e1b7d460fcc8e0bc6c99a9e33 to your computer and use it in GitHub Desktop.
NiFi Attributes to JSON with Support for JSON Attributes
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@SuppressWarnings({ "javadoc", "hiding", "static-method", "unchecked" })
@Tags({ "json", "attributes", "flowfile" })
@CapabilityDescription("Generates a JSON representation of the input FlowFile Attributes. The resulting JSON " +
"can be written to either a new Attribute 'JSONAttributes' or written to the FlowFile as content.")
@WritesAttribute(attribute = "JSONAttributes", description = "JSON representation of Attributes")
public class NestedAttributesToJSON extends AbstractProcessor {
public static final String JSON_ATTRIBUTE_NAME = "JSONAttributes";
private static final String AT_LIST_SEPARATOR = ",";
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
public static final String DESTINATION_CONTENT = "flowfile-content";
public static final String APPLICATION_JSON = "application/json";
public static final PropertyDescriptor ATTRIBUTES_LIST = new PropertyDescriptor.Builder()
.name("Attributes List")
.description("Comma separated list of attributes to be included in the resulting JSON. If this value " +
"is left empty then all existing Attributes will be included. This list of attributes is " +
"case sensitive. If an attribute specified in the list is not found it will be be emitted " +
"to the resulting JSON with an empty string or NULL value.")
public static final PropertyDescriptor ATTRIBUTES_REGEX = new PropertyDescriptor.Builder()
.displayName("Attributes Regular Expression")
.description("Regular expression that will be evaluated against the flow file attributes to select "
+ "the matching attributes. This property can be used in combination with the attributes "
+ "list property.")
.addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true))
public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder()
.description("Control if JSON value is written as a new flowfile attribute '" + JSON_ATTRIBUTE_NAME + "' " +
"or written in the flowfile content. Writing to flowfile content will overwrite any " +
"existing flowfile content.")
public static final PropertyDescriptor INCLUDE_CORE_ATTRIBUTES = new PropertyDescriptor.Builder()
.name("Include Core Attributes")
.description("Determines if the FlowFile org.apache.nifi.flowfile.attributes.CoreAttributes which are " +
"contained in every FlowFile should be included in the final JSON value generated.")
.allowableValues("true", "false")
public static final PropertyDescriptor NULL_VALUE_FOR_EMPTY_STRING = new PropertyDescriptor.Builder()
.name(("Null Value"))
"If true a non existing or empty attribute will be NULL in the resulting JSON. If false an empty " +
"string will be placed in the JSON")
.allowableValues("true", "false")
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully converted attributes to JSON").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to convert attributes to JSON").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final ObjectMapper objectMapper = new ObjectMapper();
private volatile Set<String> attributesToRemove;
private volatile Set<String> attributes;
private volatile Boolean nullValueForEmptyString;
private volatile boolean destinationContent;
private volatile Pattern pattern;
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(NULL_VALUE_FOR_EMPTY_STRING); = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
this.relationships = Collections.unmodifiableSet(relationships);
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
public Set<Relationship> getRelationships() {
return relationships;
private static Object tryJson(final String value) {
try {
return objectMapper.readValue(value, JsonNode.class);
} catch (final Exception e) {
return value;
* Builds the Map of attributes that should be included in the JSON that is emitted from this process.
* @return
* Map of values that are feed to a Jackson ObjectMapper
protected Map<String, Object> buildAttributesMapForFlowFile(
final FlowFile ff,
final Set<String> attributes,
final Set<String> attributesToRemove,
final boolean nullValForEmptyString,
final Pattern attPattern) {
Map<String, Object> result;
//If list of attributes specified get only those attributes. Otherwise write them all
if (attributes != null || attPattern != null) {
result = new HashMap<>();
if (attributes != null) {
for (final String attribute : attributes) {
final String val = ff.getAttribute(attribute);
if (val != null || nullValForEmptyString) {
result.put(attribute, tryJson(val));
} else {
result.put(attribute, "");
if (attPattern != null) {
for (final Map.Entry<String, String> e : ff.getAttributes().entrySet()) {
if (attPattern.matcher(e.getKey()).matches()) {
result.put(e.getKey(), e.getValue());
} else {
final Map<String, String> ffAttributes = ff.getAttributes();
result = new HashMap<>(ffAttributes.size());
for (final Map.Entry<String, String> e : ffAttributes.entrySet()) {
if (!attributesToRemove.contains(e.getKey())) {
result.put(e.getKey(), tryJson(e.getValue()));
return result;
private Set<String> buildAtrs(final String atrList, final Set<String> atrsToExclude) {
//If list of attributes specified get only those attributes. Otherwise write them all
if (StringUtils.isNotBlank(atrList)) {
final String[] ats = StringUtils.split(atrList, AT_LIST_SEPARATOR);
if (ats != null) {
final Set<String> result = new HashSet<>(ats.length);
for (final String str : ats) {
final String trim = str.trim();
if (!atrsToExclude.contains(trim)) {
return result;
return null;
public void onScheduled(final ProcessContext context) {
attributesToRemove = context.getProperty(INCLUDE_CORE_ATTRIBUTES).asBoolean() ? Collections.EMPTY_SET
attributes = buildAtrs(context.getProperty(ATTRIBUTES_LIST).getValue(), attributesToRemove);
nullValueForEmptyString = context.getProperty(NULL_VALUE_FOR_EMPTY_STRING).asBoolean();
destinationContent = DESTINATION_CONTENT.equals(context.getProperty(DESTINATION).getValue());
if (context.getProperty(ATTRIBUTES_REGEX).isSet()) {
pattern = Pattern.compile(context.getProperty(ATTRIBUTES_REGEX).evaluateAttributeExpressions().getValue());
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
final Map<String, Object> atrList = buildAttributesMapForFlowFile(original, attributes, attributesToRemove,
nullValueForEmptyString, pattern);
try {
if (destinationContent) {
FlowFile conFlowfile = session.write(original, (in, out) -> {
try (OutputStream outputStream = new BufferedOutputStream(out)) {
conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
session.transfer(conFlowfile, REL_SUCCESS);
} else {
final FlowFile atFlowfile = session.putAttribute(original, JSON_ATTRIBUTE_NAME,
session.transfer(atFlowfile, REL_SUCCESS);
} catch (final JsonProcessingException e) {
session.transfer(original, REL_FAILURE);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment