Created
July 7, 2016 15:37
-
-
Save mattyb149/a25bdda16f7766fa2a4e86386b40ee0d to your computer and use it in GitHub Desktop.
NiFi template that takes JSON input, routes on a value, then converts to a Cassandra Query Language (CQL) statement
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template encoding-version="1.0"><description></description><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><name>JsonToCQL</name><snippet><connections><id>945cd514-84c9-4d54-8335-315d4dfd79e7</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>28324be6-b7ca-4a88-95d4-e7eff0e62eb2</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>3293abf1-1b9d-4973-b51d-4f2e6899f18d</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>a795d09a-5222-41b7-a155-d899f9254f2e</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>3293abf1-1b9d-4973-b51d-4f2e6899f18d</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>fbb9aaf6-f731-45f0-aea0-04f3af6e0d23</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>e4efef99-31e0-4b53-8ff6-74cc34d12fcd</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>b8f08bda-66e7-4fd0-af60-d93a8bce05ff</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>8d387176-e75a-48bf-9277-ff5553b4bad6</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>0fc2a7a1-d3f2-4941-a392-9130ed773afb</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>d7775510-4547-44e6-84cb-4a9ef1670009</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>matched</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>b8f08bda-66e7-4fd0-af60-d93a8bce05ff</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>fcaca840-7ded-43da-87f2-1f97717f4e8d</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold><backPressureObjectThreshold>10000</backPressureObjectThreshold><destination><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>fbb9aaf6-f731-45f0-aea0-04f3af6e0d23</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>matched</selectedRelationships><source><groupId>6399b847-98bb-4646-baa1-65523b25a7f1</groupId><id>d7775510-4547-44e6-84cb-4a9ef1670009</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><processors><id>fbb9aaf6-f731-45f0-aea0-04f3af6e0d23</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1960.7977699604194</x><y>931.2751738177224</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>[\r\n]</value></entry><entry><key>Replacement Value</key><value></value></entry><entry><key>Character Set</key></entry><entry><key>Maximum Buffer Size</key></entry><entry><key>Replacement Strategy</key></entry><entry><key>Evaluation Mode</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><style/><type>org.apache.nifi.processors.standard.ReplaceText</type></processors><processors><id>3293abf1-1b9d-4973-b51d-4f2e6899f18d</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1955.9090980854194</x><y>726.2798124895974</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key></entry><entry><key>Replacement Value</key><value>INSERT INTO myTable JSON '$0'</value></entry><entry><key>Character Set</key></entry><entry><key>Maximum Buffer Size</key></entry><entry><key>Replacement Strategy</key></entry><entry><key>Evaluation Mode</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><style/><type>org.apache.nifi.processors.standard.ReplaceText</type></processors><processors><id>b8f08bda-66e7-4fd0-af60-d93a8bce05ff</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1482.2900197438173</x><y>729.2965188257252</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Destination</key><value>flowfile-attribute</value></entry><entry><key>Return Type</key></entry><entry><key>Path Not Found Behavior</key></entry><entry><key>Null Value Representation</key></entry><entry><key>event.name</key><value>$.eventname</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>EvaluateJsonPath</name><relationships><autoTerminate>true</autoTerminate><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><name>matched</name></relationships><relationships><autoTerminate>true</autoTerminate><name>unmatched</name></relationships><style/><type>org.apache.nifi.processors.standard.EvaluateJsonPath</type></processors><processors><id>28324be6-b7ca-4a88-95d4-e7eff0e62eb2</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1953.8977580157689</x><y>520.1398708107776</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key></entry><entry><key>Log Payload</key><value>true</value></entry><entry><key>Attributes to Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><name>success</name></relationships><style/><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>d7775510-4547-44e6-84cb-4a9ef1670009</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1485.306604417187</x><y>929.4030679947849</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Routing Strategy</key><value>Route to 'match' if all match</value></entry><entry><key>event.name</key><value>${event.name:equals("LoginSucceeded")}</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>RouteOnAttribute</name><relationships><autoTerminate>false</autoTerminate><name>matched</name></relationships><relationships><autoTerminate>true</autoTerminate><name>unmatched</name></relationships><style/><type>org.apache.nifi.processors.standard.RouteOnAttribute</type></processors><processors><id>8d387176-e75a-48bf-9277-ff5553b4bad6</id><parentGroupId>6399b847-98bb-4646-baa1-65523b25a7f1</parentGroupId><position><x>1476.2564509555484</x><y>521.145459692536</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>class GenerateFlowFileWithContent implements Processor { | |
def REL_SUCCESS = new Relationship.Builder() | |
.name('success') | |
.description('The flow file with the specified content and/or filename was successfully transferred') | |
.build(); | |
def CONTENT = new PropertyDescriptor.Builder() | |
.name('File Content').description('The content for the generated flow file') | |
.required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build() | |
def CONTENT_HAS_EL = new PropertyDescriptor.Builder() | |
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content') | |
.required(true).allowableValues('true','false').defaultValue('false').build() | |
def FILENAME = new PropertyDescriptor.Builder() | |
.name('Filename').description('The name of the flow file to be stored in the filename attribute') | |
.required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() | |
@Override | |
void initialize(ProcessorInitializationContext context) { } | |
@Override | |
Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set } | |
@Override | |
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { | |
try { | |
def session = sessionFactory.createSession() | |
def flowFile = session.create() | |
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean() | |
def contentProp = context.getProperty(CONTENT) | |
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: '' | |
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue() | |
flowFile = session.write(flowFile, { outStream -> | |
outStream.write(content.getBytes("UTF-8")) | |
} as OutputStreamCallback) | |
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) } | |
// transfer | |
session.transfer(flowFile, REL_SUCCESS) | |
session.commit() | |
} catch(e) { | |
throw new ProcessException(e) | |
} | |
} | |
@Override | |
Collection<ValidationResult> validate(ValidationContext context) { return null } | |
@Override | |
PropertyDescriptor getPropertyDescriptor(String name) { | |
switch(name) { | |
case 'File Content': return CONTENT | |
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL | |
case 'Filename': return FILENAME | |
default: return null | |
} | |
} | |
@Override | |
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { } | |
@Override | |
List<PropertyDescriptor> getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List } | |
@Override | |
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' } | |
} | |
processor = new GenerateFlowFileWithContent()</value></entry><entry><key>Module Directory</key></entry><entry><key>File Content</key><value>{ | |
"eventname": "LoginSucceeded", | |
"data": "myData" | |
}</value></entry><entry><key>Evaluate Expressions in Content</key><value>false</value></entry><entry><key>Filename</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>3 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFileWithContent</name><relationships><autoTerminate>false</autoTerminate><name>success</name></relationships><style/><type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type></processors></snippet><timestamp>07/07/2016 11:34:59 EDT</timestamp></template> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment