Skip to content

Instantly share code, notes, and snippets.

@mattyb149
Created February 25, 2016 02:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mattyb149/0c87a1a6f1d98a43c8d0 to your computer and use it in GitHub Desktop.
Save mattyb149/0c87a1a6f1d98a43c8d0 to your computer and use it in GitHub Desktop.
A NiFi template using InvokeScriptedProcessor with Groovy to offer a GenerateFlowFile alternative that allows for setting the content and filename of a flow file
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template><description>This template can be used like a GenerateFlowFile processor, where the content and filename can be specified. It uses InvokeScriptedProcessor and Groovy to achieve this.</description><name>GenerateFlowFileWithContent</name><snippet><processors><id>8e949039-0779-458d-b825-32b09ef80b38</id><parentGroupId>d3d19b0c-5876-41a2-8b09-e5ed911dfa67</parentGroupId><position><x>-172.94315107850957</x><y>-64.62506581228018</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Script Engine</key><value><allowableValues><displayName>ECMAScript</displayName><value>ECMAScript</value></allowableValues><allowableValues><displayName>Groovy</displayName><value>Groovy</value></allowableValues><allowableValues><displayName>lua</displayName><value>lua</value></allowableValues><allowableValues><displayName>python</displayName><value>python</value></allowableValues><allowableValues><displayName>ruby</displayName><value>ruby</value></allowableValues><defaultValue>ECMAScript</defaultValue><description>The engine to execute scripts</description><displayName>Script Engine</displayName><dynamic>false</dynamic><name>Script Engine</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Script File</key><value><description>Path to script file to execute. Only one of Script File or Script Body may be used</description><displayName>Script File</displayName><dynamic>false</dynamic><name>Script File</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Script Body</key><value><description>Body of script to execute. Only one of Script File or Script Body may be used</description><displayName>Script Body</displayName><dynamic>false</dynamic><name>Script Body</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Module Directory</key><value><description>Comma-separated list of paths to files and/or directories which contain modules required by the script.</description><displayName>Module Directory</displayName><dynamic>false</dynamic><name>Module Directory</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>File Content</key><value><description>The content for the generated flow file</description><displayName>File Content</displayName><dynamic>false</dynamic><name>File Content</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Evaluate Expressions in Content</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>Whether to evaluate NiFi Expression Language constructs within the content</description><displayName>Evaluate Expressions in Content</displayName><dynamic>false</dynamic><name>Evaluate Expressions in Content</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Filename</key><value><description>The name of the flow file to be stored in the filename attribute</description><displayName>Filename</displayName><dynamic>false</dynamic><name>Filename</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>class GenerateFlowFileWithContent implements Processor {
def REL_SUCCESS = new Relationship.Builder()
.name('success')
.description('The flow file with the specified content and/or filename was successfully transferred')
.build();
def CONTENT = new PropertyDescriptor.Builder()
.name('File Content').description('The content for the generated flow file')
.required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build()
def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
.required(true).allowableValues('true','false').defaultValue('false').build()
def FILENAME = new PropertyDescriptor.Builder()
.name('Filename').description('The name of the flow file to be stored in the filename attribute')
.required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
@Override
void initialize(ProcessorInitializationContext context) { }
@Override
Set&lt;Relationship&gt; getRelationships() { return [REL_SUCCESS] as Set }
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.create()
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
def contentProp = context.getProperty(CONTENT)
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
flowFile = session.write(flowFile, { outStream -&gt;
outStream.write(content.getBytes(&quot;UTF-8&quot;))
} as OutputStreamCallback)
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
// transfer
session.transfer(flowFile, REL_SUCCESS)
session.commit()
} catch(e) {
throw new ProcessException(e)
}
}
@Override
Collection&lt;ValidationResult&gt; validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'File Content': return CONTENT
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
case 'Filename': return FILENAME
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List&lt;PropertyDescriptor&gt; getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }
@Override
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
}
processor = new GenerateFlowFileWithContent()</value></entry><entry><key>Module Directory</key></entry><entry><key>File Content</key></entry><entry><key>Evaluate Expressions in Content</key><value>false</value></entry><entry><key>Filename</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>3 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFileWithContent</name><relationships><autoTerminate>false</autoTerminate><description>The flow file with the specified content and/or filename was successfully transferred</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type></processors></snippet><timestamp>02/24/2016 17:34:25 EST</timestamp></template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment