Skip to content

Instantly share code, notes, and snippets.

@mattyb149
Created July 7, 2016 15:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattyb149/a25bdda16f7766fa2a4e86386b40ee0d to your computer and use it in GitHub Desktop.
Save mattyb149/a25bdda16f7766fa2a4e86386b40ee0d to your computer and use it in GitHub Desktop.
NiFi template that takes JSON input, routes on a value, then converts to a Cassandra Query Language (CQL) statement
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template encoding-version="1.0"><description></description><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><name>JsonToCQL</name><snippet><connections><id>945cd514-84c9-4d54-8335-315d4dfd79e7</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>28324be6-b7ca-4a88-95d4-e7eff0e62eb2</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>3293abf1-1b9d-4973-b51d-4f2e6899f18d</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>a795d09a-5222-41b7-a155-d899f9254f2e</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>3293abf1-1b9d-4973-b51d-4f2e6899f18d</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>fbb9aaf6-f731-45f0-aea0-04f3af6e0d23</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>e4efef99-31e0-4b53-8ff6-74cc34d12fcd</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>b8f08bda-66e7-4fd0-af60-d93a8bce05ff</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>8d387176-e75a-48bf-9277-ff5553b4bad6</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>0fc2a7a1-d3f2-4941-a392-9130ed773afb</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>d7775510-4547-44e6-84cb-4a9ef1670009</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>matched</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>b8f08bda-66e7-4fd0-af60-d93a8bce05ff</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>fcaca840-7ded-43da-87f2-1f97717f4e8d</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>fbb9aaf6-f731-45f0-aea0-04f3af6e0d23</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>matched</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>d7775510-4547-44e6-84cb-4a9ef1670009</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><processors><id>fbb9aaf6-f731-45f0-aea0-04f3af6e0d23</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1960.7977699604194</x><y>931.2751738177224</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>[\r\n]</value></entry><entry><key>Replacement Value</key><value></value></entry><entry><key>Character Set</key></entry><entry><key>Maximum Buffer Size</key></entry><entry><key>Replacement Strategy</key></entry><entry><key>Evaluation Mode</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><style/><type>org.apache.nifi.processors.standard.ReplaceText</type></processors><processors><id>3293abf1-1b9d-4973-b51d-4f2e6899f18d</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1955.9090980854194</x><y>726.2798124895974</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key></entry><entry><key>Replacement Value</key><value>INSERT INTO myTable JSON '$0'</value></entry><entry><key>Character Set</key></entry><entry><key>Maximum Buffer Size</key></entry><entry><key>Replacement Strategy</key></entry><entry><key>Evaluation Mode</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><style/><type>org.apache.nifi.processors.standard.ReplaceText</type></processors><processors><id>b8f08bda-66e7-4fd0-af60-d93a8bce05ff</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1482.2900197438173</x><y>729.2965188257252</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Destination</key><value>flowfile-attribute</value></entry><entry><key>Return Type</key></entry><entry><key>Path Not Found Behavior</key></entry><entry><key>Null Value Representation</key></entry><entry><key>event.name</key><value>$.eventname</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>EvaluateJsonPath</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>matched</name></relationships><relationships><autoTerminate>true</autoTerminate><name>unmatched</name></relationships><style/><type>org.apache.nifi.processors.standard.EvaluateJsonPath</type></processors><processors><id>28324be6-b7ca-4a88-95d4-e7eff0e62eb2</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1953.8977580157689</x><y>520.1398708107776</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key></entry><entry><key>Log Payload</key><value>true</value></entry><entry><key>Attributes to Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><name>success</name></relationships><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>d7775510-4547-44e6-84cb-4a9ef1670009</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1485.306604417187</x><y>929.4030679947849</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Routing Strategy</key><value>Route to 'match' if all match</value></entry><entry><key>event.name</key><value>${event.name:equals(&quot;LoginSucceeded&quot;)}</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>RouteOnAttribute</name><relationships><autoTerminate>false</autoTerminate><name>matched</name></relationships><relationships><autoTerminate>true</autoTerminate><name>unmatched</name></relationships><style/><type>org.apache.nifi.processors.standard.RouteOnAttribute</type></processors><processors><id>8d387176-e75a-48bf-9277-ff5553b4bad6</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1476.2564509555484</x><y>521.145459692536</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>class GenerateFlowFileWithContent implements Processor {
def REL_SUCCESS = new Relationship.Builder()
.name('success')
.description('The flow file with the specified content and/or filename was successfully transferred')
.build();
def CONTENT = new PropertyDescriptor.Builder()
.name('File Content').description('The content for the generated flow file')
.required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build()
def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
.required(true).allowableValues('true','false').defaultValue('false').build()
def FILENAME = new PropertyDescriptor.Builder()
.name('Filename').description('The name of the flow file to be stored in the filename attribute')
.required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
@Override
void initialize(ProcessorInitializationContext context) { }
@Override
Set&lt;Relationship&gt; getRelationships() { return [REL_SUCCESS] as Set }
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.create()
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
def contentProp = context.getProperty(CONTENT)
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
flowFile = session.write(flowFile, { outStream -&gt;
outStream.write(content.getBytes(&quot;UTF-8&quot;))
} as OutputStreamCallback)
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
// transfer
session.transfer(flowFile, REL_SUCCESS)
session.commit()
} catch(e) {
throw new ProcessException(e)
}
}
@Override
Collection&lt;ValidationResult&gt; validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'File Content': return CONTENT
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
case 'Filename': return FILENAME
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List&lt;PropertyDescriptor&gt; getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }
@Override
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
}
processor = new GenerateFlowFileWithContent()</value></entry><entry><key>Module Directory</key></entry><entry><key>File Content</key><value>{
&quot;eventname&quot;: &quot;LoginSucceeded&quot;,
&quot;data&quot;: &quot;myData&quot;
}</value></entry><entry><key>Evaluate Expressions in Content</key><value>false</value></entry><entry><key>Filename</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>3 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFileWithContent</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><style/><type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type></processors></snippet><timestamp>07/07/2016 11:34:59 EDT</timestamp></template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment