Created
February 25, 2016 02:28
-
-
Save mattyb149/0c87a1a6f1d98a43c8d0 to your computer and use it in GitHub Desktop.
A NiFi template using InvokeScriptedProcessor with Groovy to offer a GenerateFlowFile alternative that allows for setting the content and filename of a flow file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template><description>This template can be used like a GenerateFlowFile processor, where the content and filename can be specified. It uses InvokeScriptedProcessor and Groovy to achieve this.</description><name>GenerateFlowFileWithContent</name><snippet><processors><id>8e949039-0779-458d-b825-32b09ef80b38</id><parentGroupId>d3d19b0c-5876-41a2-8b09-e5ed911dfa67</parentGroupId><position><x>-172.94315107850957</x><y>-64.62506581228018</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Script Engine</key><value><allowableValues><displayName>ECMAScript</displayName><value>ECMAScript</value></allowableValues><allowableValues><displayName>Groovy</displayName><value>Groovy</value></allowableValues><allowableValues><displayName>lua</displayName><value>lua</value></allowableValues><allowableValues><displayName>python</displayName><value>python</value></allowableValues><allowableValues><displayName>ruby</displayName><value>ruby</value></allowableValues><defaultValue>ECMAScript</defaultValue><description>The engine to execute scripts</description><displayName>Script Engine</displayName><dynamic>false</dynamic><name>Script Engine</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Script File</key><value><description>Path to script file to execute. Only one of Script File or Script Body may be used</description><displayName>Script File</displayName><dynamic>false</dynamic><name>Script File</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Script Body</key><value><description>Body of script to execute. Only one of Script File or Script Body may be used</description><displayName>Script Body</displayName><dynamic>false</dynamic><name>Script Body</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Module Directory</key><value><description>Comma-separated list of paths to files and/or directories which contain modules required by the script.</description><displayName>Module Directory</displayName><dynamic>false</dynamic><name>Module Directory</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>File Content</key><value><description>The content for the generated flow file</description><displayName>File Content</displayName><dynamic>false</dynamic><name>File Content</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Evaluate Expressions in Content</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>Whether to evaluate NiFi Expression Language constructs within the content</description><displayName>Evaluate Expressions in Content</displayName><dynamic>false</dynamic><name>Evaluate Expressions in Content</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Filename</key><value><description>The name of the flow file to be stored in the filename attribute</description><displayName>Filename</displayName><dynamic>false</dynamic><name>Filename</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>class GenerateFlowFileWithContent implements Processor { | |
def REL_SUCCESS = new Relationship.Builder() | |
.name('success') | |
.description('The flow file with the specified content and/or filename was successfully transferred') | |
.build(); | |
def CONTENT = new PropertyDescriptor.Builder() | |
.name('File Content').description('The content for the generated flow file') | |
.required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build() | |
def CONTENT_HAS_EL = new PropertyDescriptor.Builder() | |
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content') | |
.required(true).allowableValues('true','false').defaultValue('false').build() | |
def FILENAME = new PropertyDescriptor.Builder() | |
.name('Filename').description('The name of the flow file to be stored in the filename attribute') | |
.required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() | |
@Override | |
void initialize(ProcessorInitializationContext context) { } | |
@Override | |
Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set } | |
@Override | |
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { | |
try { | |
def session = sessionFactory.createSession() | |
def flowFile = session.create() | |
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean() | |
def contentProp = context.getProperty(CONTENT) | |
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: '' | |
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue() | |
flowFile = session.write(flowFile, { outStream -> | |
outStream.write(content.getBytes("UTF-8")) | |
} as OutputStreamCallback) | |
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) } | |
// transfer | |
session.transfer(flowFile, REL_SUCCESS) | |
session.commit() | |
} catch(e) { | |
throw new ProcessException(e) | |
} | |
} | |
@Override | |
Collection<ValidationResult> validate(ValidationContext context) { return null } | |
@Override | |
PropertyDescriptor getPropertyDescriptor(String name) { | |
switch(name) { | |
case 'File Content': return CONTENT | |
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL | |
case 'Filename': return FILENAME | |
default: return null | |
} | |
} | |
@Override | |
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { } | |
@Override | |
List<PropertyDescriptor> getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List } | |
@Override | |
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' } | |
} | |
processor = new GenerateFlowFileWithContent()</value></entry><entry><key>Module Directory</key></entry><entry><key>File Content</key></entry><entry><key>Evaluate Expressions in Content</key><value>false</value></entry><entry><key>Filename</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>3 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFileWithContent</name><relationships><autoTerminate>false</autoTerminate><description>The flow file with the specified content and/or filename was successfully transferred</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type></processors></snippet><timestamp>02/24/2016 17:34:25 EST</timestamp></template> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment