Created
March 21, 2016 17:52
-
-
Save mattyb149/478864017ec70d76f74f to your computer and use it in GitHub Desktop.
A NiFi template that uses Groovy to parse an attribute containing JSON, and creating a new attribute from one of the JSON fields
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template><description></description><name>ParseJsonInAttribute</name><snippet><connections><id>906cd6cd-343f-45d6-8e08-6306ef089773</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>3382f8b0-d0b0-41c6-bca7-8bd546a6be82</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>77b91aad-5389-4bdf-876b-5015faccfe9c</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>5b1376bf-e6d6-456c-9e83-444f554b25d8</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>b9663547-8ca1-43e5-bd11-5f3ba3029b27</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>947b45b0-37cd-473d-b5cf-7fbd3f143da7</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>cd8aaf0e-2410-4c95-906e-d67e12168e3f</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>77b91aad-5389-4bdf-876b-5015faccfe9c</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>b9663547-8ca1-43e5-bd11-5f3ba3029b27</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><processors><id>947b45b0-37cd-473d-b5cf-7fbd3f143da7</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><position><x>593.0</x><y>441.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Script Engine</key><value><allowableValues><displayName>ECMAScript</displayName><value>ECMAScript</value></allowableValues><allowableValues><displayName>Groovy</displayName><value>Groovy</value></allowableValues><allowableValues><displayName>lua</displayName><value>lua</value></allowableValues><allowableValues><displayName>python</displayName><value>python</value></allowableValues><allowableValues><displayName>ruby</displayName><value>ruby</value></allowableValues><defaultValue>ECMAScript</defaultValue><description>The engine to execute scripts</description><displayName>Script Engine</displayName><dynamic>false</dynamic><name>Script Engine</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Script File</key><value><description>Path to script file to execute. Only one of Script File or Script Body may be used</description><displayName>Script File</displayName><dynamic>false</dynamic><name>Script File</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Script Body</key><value><description>Body of script to execute. Only one of Script File or Script Body may be used</description><displayName>Script Body</displayName><dynamic>false</dynamic><name>Script Body</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Module Directory</key><value><description>Comma-separated list of paths to files and/or directories which contain modules required by the script.</description><displayName>Module Directory</displayName><dynamic>false</dynamic><name>Module Directory</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>File Content</key><value><description>The content for the generated flow file</description><displayName>File Content</displayName><dynamic>false</dynamic><name>File Content</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Evaluate Expressions in Content</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>Whether to evaluate NiFi Expression Language constructs within the content</description><displayName>Evaluate Expressions in Content</displayName><dynamic>false</dynamic><name>Evaluate Expressions in Content</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Filename</key><value><description>The name of the flow file to be stored in the filename attribute</description><displayName>Filename</displayName><dynamic>false</dynamic><name>Filename</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>class GenerateFlowFileWithContent implements Processor { | |
def REL_SUCCESS = new Relationship.Builder() | |
.name('success') | |
.description('The flow file with the specified content and/or filename was successfully transferred') | |
.build(); | |
def CONTENT = new PropertyDescriptor.Builder() | |
.name('File Content').description('The content for the generated flow file') | |
.required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build() | |
def CONTENT_HAS_EL = new PropertyDescriptor.Builder() | |
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content') | |
.required(true).allowableValues('true','false').defaultValue('false').build() | |
def FILENAME = new PropertyDescriptor.Builder() | |
.name('Filename').description('The name of the flow file to be stored in the filename attribute') | |
.required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() | |
@Override | |
void initialize(ProcessorInitializationContext context) { } | |
@Override | |
Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set } | |
@Override | |
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { | |
try { | |
def session = sessionFactory.createSession() | |
def flowFile = session.create() | |
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean() | |
def contentProp = context.getProperty(CONTENT) | |
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: '' | |
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue() | |
flowFile = session.write(flowFile, { outStream -> | |
outStream.write(content.getBytes("UTF-8")) | |
} as OutputStreamCallback) | |
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) } | |
// transfer | |
session.transfer(flowFile, REL_SUCCESS) | |
session.commit() | |
} catch(e) { | |
throw new ProcessException(e) | |
} | |
} | |
@Override | |
Collection<ValidationResult> validate(ValidationContext context) { return null } | |
@Override | |
PropertyDescriptor getPropertyDescriptor(String name) { | |
switch(name) { | |
case 'File Content': return CONTENT | |
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL | |
case 'Filename': return FILENAME | |
default: return null | |
} | |
} | |
@Override | |
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { } | |
@Override | |
List<PropertyDescriptor> getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List } | |
@Override | |
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' } | |
} | |
processor = new GenerateFlowFileWithContent()</value></entry><entry><key>Module Directory</key></entry><entry><key>File Content</key><value>{ | |
"field1": "Hello", | |
"field2": "World!" | |
}</value></entry><entry><key>Evaluate Expressions in Content</key><value>false</value></entry><entry><key>Filename</key><value>sample.atom</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>3 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFileWithContent</name><relationships><autoTerminate>false</autoTerminate><description>The flow file with the specified content and/or filename was successfully transferred</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type></processors><processors><id>3382f8b0-d0b0-41c6-bca7-8bd546a6be82</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><position><x>1955.0</x><y>444.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Log Level</key><value><allowableValues><displayName>trace</displayName><value>trace</value></allowableValues><allowableValues><displayName>debug</displayName><value>debug</value></allowableValues><allowableValues><displayName>info</displayName><value>info</value></allowableValues><allowableValues><displayName>warn</displayName><value>warn</value></allowableValues><allowableValues><displayName>error</displayName><value>error</value></allowableValues><defaultValue>info</defaultValue><description>The Log Level to use when logging the Attributes</description><displayName>Log Level</displayName><dynamic>false</dynamic><name>Log Level</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log Payload</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.</description><displayName>Log Payload</displayName><dynamic>false</dynamic><name>Log Payload</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Log</key><value><description>A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.</description><displayName>Attributes to Log</displayName><dynamic>false</dynamic><name>Attributes to Log</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Ignore</key><value><description>A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.</description><displayName>Attributes to Ignore</displayName><dynamic>false</dynamic><name>Attributes to Ignore</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log prefix</key><value><description>Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.</description><displayName>Log prefix</displayName><dynamic>false</dynamic><name>Log prefix</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key></entry><entry><key>Log Payload</key></entry><entry><key>Attributes to Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><description>All FlowFiles are routed to this relationship</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>77b91aad-5389-4bdf-876b-5015faccfe9c</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><position><x>1484.0</x><y>439.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Script Engine</key><value><allowableValues><displayName>ECMAScript</displayName><value>ECMAScript</value></allowableValues><allowableValues><displayName>Groovy</displayName><value>Groovy</value></allowableValues><allowableValues><displayName>lua</displayName><value>lua</value></allowableValues><allowableValues><displayName>python</displayName><value>python</value></allowableValues><allowableValues><displayName>ruby</displayName><value>ruby</value></allowableValues><defaultValue>ECMAScript</defaultValue><description>The engine to execute scripts</description><displayName>Script Engine</displayName><dynamic>false</dynamic><name>Script Engine</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Script File</key><value><description>Path to script file to execute. Only one of Script File or Script Body may be used</description><displayName>Script File</displayName><dynamic>false</dynamic><name>Script File</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Script Body</key><value><description>Body of script to execute. Only one of Script File or Script Body may be used</description><displayName>Script Body</displayName><dynamic>false</dynamic><name>Script Body</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Module Directory</key><value><description>Comma-separated list of paths to files and/or directories which contain modules required by the script.</description><displayName>Module Directory</displayName><dynamic>false</dynamic><name>Module Directory</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>import groovy.json.JsonSlurper | |
def flowfile = session.get() | |
if(!flowfile) return | |
def json = new JsonSlurper().parseText(flowfile.getAttribute('kafka.key')) | |
flowfile = session.putAttribute(flowfile, 'myKey', json.data.myKey) | |
session.transfer(flowfile, REL_SUCCESS)</value></entry><entry><key>Module Directory</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ExecuteScript</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles that failed to be processed</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that were successfully processed</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>false</supportsParallelProcessing><type>org.apache.nifi.processors.script.ExecuteScript</type></processors><processors><id>b9663547-8ca1-43e5-bd11-5f3ba3029b27</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><position><x>1016.0</x><y>438.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Delete Attributes Expression</key><value><description>Regular expression for attributes to be deleted from flowfiles.</description><displayName>Delete Attributes Expression</displayName><dynamic>false</dynamic><name>Delete Attributes Expression</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>kafka.key</key><value><description></description><displayName>kafka.key</displayName><dynamic>true</dynamic><name>kafka.key</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Delete Attributes Expression</key></entry><entry><key>kafka.key</key><value>{"data": {"myKey": "myValue"}}</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>UpdateAttribute</name><relationships><autoTerminate>false</autoTerminate><description>All FlowFiles are routed to this relationship</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.attributes.UpdateAttribute</type></processors></snippet><timestamp>03/21/2016 13:51:47 EDT</timestamp></template> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment