Skip to content

Instantly share code, notes, and snippets.

@mattyb149
Created March 21, 2016 17:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattyb149/478864017ec70d76f74f to your computer and use it in GitHub Desktop.
Save mattyb149/478864017ec70d76f74f to your computer and use it in GitHub Desktop.
A NiFi template that uses Groovy to parse an attribute containing JSON, and creating a new attribute from one of the JSON fields
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template><description></description><name>ParseJsonInAttribute</name><snippet><connections><id>906cd6cd-343f-45d6-8e08-6306ef089773</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>3382f8b0-d0b0-41c6-bca7-8bd546a6be82</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>77b91aad-5389-4bdf-876b-5015faccfe9c</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>5b1376bf-e6d6-456c-9e83-444f554b25d8</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>b9663547-8ca1-43e5-bd11-5f3ba3029b27</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>947b45b0-37cd-473d-b5cf-7fbd3f143da7</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>cd8aaf0e-2410-4c95-906e-d67e12168e3f</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>77b91aad-5389-4bdf-876b-5015faccfe9c</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</groupId><id>b9663547-8ca1-43e5-bd11-5f3ba3029b27</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><processors><id>947b45b0-37cd-473d-b5cf-7fbd3f143da7</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><position><x>593.0</x><y>441.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Script Engine</key><value><allowableValues><displayName>ECMAScript</displayName><value>ECMAScript</value></allowableValues><allowableValues><displayName>Groovy</displayName><value>Groovy</value></allowableValues><allowableValues><displayName>lua</displayName><value>lua</value></allowableValues><allowableValues><displayName>python</displayName><value>python</value></allowableValues><allowableValues><displayName>ruby</displayName><value>ruby</value></allowableValues><defaultValue>ECMAScript</defaultValue><description>The engine to execute scripts</description><displayName>Script Engine</displayName><dynamic>false</dynamic><name>Script Engine</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Script File</key><value><description>Path to script file to execute. Only one of Script File or Script Body may be used</description><displayName>Script File</displayName><dynamic>false</dynamic><name>Script File</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Script Body</key><value><description>Body of script to execute. Only one of Script File or Script Body may be used</description><displayName>Script Body</displayName><dynamic>false</dynamic><name>Script Body</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Module Directory</key><value><description>Comma-separated list of paths to files and/or directories which contain modules required by the script.</description><displayName>Module Directory</displayName><dynamic>false</dynamic><name>Module Directory</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>File Content</key><value><description>The content for the generated flow file</description><displayName>File Content</displayName><dynamic>false</dynamic><name>File Content</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Evaluate Expressions in Content</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>Whether to evaluate NiFi Expression Language constructs within the content</description><displayName>Evaluate Expressions in Content</displayName><dynamic>false</dynamic><name>Evaluate Expressions in Content</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Filename</key><value><description>The name of the flow file to be stored in the filename attribute</description><displayName>Filename</displayName><dynamic>false</dynamic><name>Filename</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>class GenerateFlowFileWithContent implements Processor {
def REL_SUCCESS = new Relationship.Builder()
.name('success')
.description('The flow file with the specified content and/or filename was successfully transferred')
.build();
def CONTENT = new PropertyDescriptor.Builder()
.name('File Content').description('The content for the generated flow file')
.required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build()
def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
.required(true).allowableValues('true','false').defaultValue('false').build()
def FILENAME = new PropertyDescriptor.Builder()
.name('Filename').description('The name of the flow file to be stored in the filename attribute')
.required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
@Override
void initialize(ProcessorInitializationContext context) { }
@Override
Set&lt;Relationship&gt; getRelationships() { return [REL_SUCCESS] as Set }
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.create()
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
def contentProp = context.getProperty(CONTENT)
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
flowFile = session.write(flowFile, { outStream -&gt;
outStream.write(content.getBytes(&quot;UTF-8&quot;))
} as OutputStreamCallback)
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
// transfer
session.transfer(flowFile, REL_SUCCESS)
session.commit()
} catch(e) {
throw new ProcessException(e)
}
}
@Override
Collection&lt;ValidationResult&gt; validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'File Content': return CONTENT
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
case 'Filename': return FILENAME
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List&lt;PropertyDescriptor&gt; getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }
@Override
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
}
processor = new GenerateFlowFileWithContent()</value></entry><entry><key>Module Directory</key></entry><entry><key>File Content</key><value>{
&quot;field1&quot;: &quot;Hello&quot;,
&quot;field2&quot;: &quot;World!&quot;
}</value></entry><entry><key>Evaluate Expressions in Content</key><value>false</value></entry><entry><key>Filename</key><value>sample.atom</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>3 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFileWithContent</name><relationships><autoTerminate>false</autoTerminate><description>The flow file with the specified content and/or filename was successfully transferred</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type></processors><processors><id>3382f8b0-d0b0-41c6-bca7-8bd546a6be82</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><position><x>1955.0</x><y>444.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Log Level</key><value><allowableValues><displayName>trace</displayName><value>trace</value></allowableValues><allowableValues><displayName>debug</displayName><value>debug</value></allowableValues><allowableValues><displayName>info</displayName><value>info</value></allowableValues><allowableValues><displayName>warn</displayName><value>warn</value></allowableValues><allowableValues><displayName>error</displayName><value>error</value></allowableValues><defaultValue>info</defaultValue><description>The Log Level to use when logging the Attributes</description><displayName>Log Level</displayName><dynamic>false</dynamic><name>Log Level</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log Payload</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, the FlowFile's payload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.</description><displayName>Log Payload</displayName><dynamic>false</dynamic><name>Log Payload</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Log</key><value><description>A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.</description><displayName>Attributes to Log</displayName><dynamic>false</dynamic><name>Attributes to Log</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Ignore</key><value><description>A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.</description><displayName>Attributes to Ignore</displayName><dynamic>false</dynamic><name>Attributes to Ignore</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log prefix</key><value><description>Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.</description><displayName>Log prefix</displayName><dynamic>false</dynamic><name>Log prefix</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key></entry><entry><key>Log Payload</key></entry><entry><key>Attributes to Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><description>All FlowFiles are routed to this relationship</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>77b91aad-5389-4bdf-876b-5015faccfe9c</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><position><x>1484.0</x><y>439.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Script Engine</key><value><allowableValues><displayName>ECMAScript</displayName><value>ECMAScript</value></allowableValues><allowableValues><displayName>Groovy</displayName><value>Groovy</value></allowableValues><allowableValues><displayName>lua</displayName><value>lua</value></allowableValues><allowableValues><displayName>python</displayName><value>python</value></allowableValues><allowableValues><displayName>ruby</displayName><value>ruby</value></allowableValues><defaultValue>ECMAScript</defaultValue><description>The engine to execute scripts</description><displayName>Script Engine</displayName><dynamic>false</dynamic><name>Script Engine</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Script File</key><value><description>Path to script file to execute. Only one of Script File or Script Body may be used</description><displayName>Script File</displayName><dynamic>false</dynamic><name>Script File</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Script Body</key><value><description>Body of script to execute. Only one of Script File or Script Body may be used</description><displayName>Script Body</displayName><dynamic>false</dynamic><name>Script Body</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Module Directory</key><value><description>Comma-separated list of paths to files and/or directories which contain modules required by the script.</description><displayName>Module Directory</displayName><dynamic>false</dynamic><name>Module Directory</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>import groovy.json.JsonSlurper
def flowfile = session.get()
if(!flowfile) return
def json = new JsonSlurper().parseText(flowfile.getAttribute('kafka.key'))
flowfile = session.putAttribute(flowfile, 'myKey', json.data.myKey)
session.transfer(flowfile, REL_SUCCESS)</value></entry><entry><key>Module Directory</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ExecuteScript</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles that failed to be processed</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that were successfully processed</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>false</supportsParallelProcessing><type>org.apache.nifi.processors.script.ExecuteScript</type></processors><processors><id>b9663547-8ca1-43e5-bd11-5f3ba3029b27</id><parentGroupId>08ae6f8e-de9b-4014-ab98-2c415b5e9d2a</parentGroupId><position><x>1016.0</x><y>438.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Delete Attributes Expression</key><value><description>Regular expression for attributes to be deleted from flowfiles.</description><displayName>Delete Attributes Expression</displayName><dynamic>false</dynamic><name>Delete Attributes Expression</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>kafka.key</key><value><description></description><displayName>kafka.key</displayName><dynamic>true</dynamic><name>kafka.key</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Delete Attributes Expression</key></entry><entry><key>kafka.key</key><value>{&quot;data&quot;: {&quot;myKey&quot;: &quot;myValue&quot;}}</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>UpdateAttribute</name><relationships><autoTerminate>false</autoTerminate><description>All FlowFiles are routed to this relationship</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.attributes.UpdateAttribute</type></processors></snippet><timestamp>03/21/2016 13:51:47 EDT</timestamp></template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment