Skip to content

Instantly share code, notes, and snippets.

@mattyb149
Created August 29, 2016 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattyb149/f9bd1a8598eaf13db50f9cd73b8a3e06 to your computer and use it in GitHub Desktop.
Save mattyb149/f9bd1a8598eaf13db50f9cd73b8a3e06 to your computer and use it in GitHub Desktop.
Apache NiFi 1.0 template to convert CSV files to Cassandra Query Language (SQL) statements and execute them
<?xml version="1.0" ?>
<template encoding-version="1.0">
<description>This template describes a flow where a CSV file (whose filename and content) contributes to the fields in a Cassandra table is processed, then CQL statements are constructed and executed.</description>
<groupId>d6aa94a0-0156-1000-71a4-a96b6da4672f</groupId>
<name>ConvertCSVtoCQL</name>
<snippet>
<connections>
<id>b7b54e60-d92f-4bb1-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>0</backPressureObjectThreshold>
<destination>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>7969ae1c-8754-4f49-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d794ca20-98c5-4fc5-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d6ad6062-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6ad3cb7-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>splits</selectedRelationships>
<source>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6abbdf3-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d6aea457-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6ae76e5-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>8e949039-0779-458d-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d6aebdbc-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6abbdf3-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6ae76e5-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d6b4f5b7-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6b4e56f-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>matched</selectedRelationships>
<source>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6ad3cb7-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d6be6cc3-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6be429d-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6b4e56f-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d6d242d1-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>7969ae1c-8754-4f49-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId>
<id>d6be429d-0156-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<labels>
<id>d6e8d098-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>885.9999917116437</x>
<y>707.1400005203105</y>
</position>
<height>94.0</height>
<label>Creates the keyspace, drops the table,
and creates the table. This is a helper
for the example flow, should only be run
once before the rest of the flow, and can
be removed if the keyspace and table exist.
&lt;--</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>246.99996948242188</width>
</labels>
<labels>
<id>d6e9804a-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>371.9999917116436</x>
<y>25.140004335007745</y>
</position>
<height>56.999996185302734</height>
<label>Generates a test CSV file with content.
&lt;--</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>217.99996948242188</width>
</labels>
<labels>
<id>d6eadf91-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>368.9999917116436</x>
<y>228.14000052031048</y>
</position>
<height>67.0</height>
<label>The filename is station1_sensor2.csv,
these segments are parsed to get
fields for the CQL INSERTs.
&lt;--</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>216.99996948242188</width>
</labels>
<labels>
<id>d6ec9645-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>1036.9999917116438</x>
<y>24.140004335007745</y>
</position>
<height>61.0</height>
<label>This attempts to strip the microseconds
from the timestamp, but ends up truncating
all fractional seconds.
&lt;--</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>243.99996948242188</width>
</labels>
<labels>
<id>d6ed7b8f-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>1034.9999917116438</x>
<y>269.1400005203105</y>
</position>
<height>73.0</height>
<label>Builds a CQL INSERT statement with
explicit values. Also could use UpdateAttribute
in order to use prepared statements.
&lt;--</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>271.9999694824219</width>
</labels>
<labels>
<id>d6ef37d2-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>1039.9999917116438</x>
<y>511.14000052031054</y>
</position>
<height>48.000003814697266</height>
<label>Executes the CQL statements
(INSERT or DDL)
&lt;--</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>174.99996948242188</width>
</labels>
<labels>
<id>d6f0d22f-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>365.9999917116436</x>
<y>619.1400005203105</y>
</position>
<height>48.000003814697266</height>
<label>Uses regular expressions to
group the columns
&lt;--</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>172.99996948242188</width>
</labels>
<processors>
<id>8e949039-0779-458d-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>1.0000114440917969</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
<entry>
<key>File Content</key>
<value>
<name>File Content</name>
</value>
</entry>
<entry>
<key>Evaluate Expressions in Content</key>
<value>
<name>Evaluate Expressions in Content</name>
</value>
</entry>
<entry>
<key>Filename</key>
<value>
<name>Filename</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<value>class GenerateFlowFileWithContent implements Processor {
def REL_SUCCESS = new Relationship.Builder()
.name('success')
.description('The flow file with the specified content and/or filename was successfully transferred')
.build();
def CONTENT = new PropertyDescriptor.Builder()
.name('File Content').description('The content for the generated flow file')
.required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build()
def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
.required(true).allowableValues('true','false').defaultValue('false').build()
def FILENAME = new PropertyDescriptor.Builder()
.name('Filename').description('The name of the flow file to be stored in the filename attribute')
.required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build()
@Override
void initialize(ProcessorInitializationContext context) { }
@Override
Set&lt;Relationship&gt; getRelationships() { return [REL_SUCCESS] as Set }
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.create()
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
def contentProp = context.getProperty(CONTENT)
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
flowFile = session.write(flowFile, { outStream -&gt;
outStream.write(content.getBytes("UTF-8"))
} as OutputStreamCallback)
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
// transfer
session.transfer(flowFile, REL_SUCCESS)
session.commit()
} catch(e) {
throw new ProcessException(e)
}
}
@Override
Collection&lt;ValidationResult&gt; validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'File Content': return CONTENT
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
case 'Filename': return FILENAME
default: return null
}
}
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List&lt;PropertyDescriptor&gt; getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }
@Override
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
}
processor = new GenerateFlowFileWithContent()</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
<entry>
<key>File Content</key>
<value>2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;</value>
</entry>
<entry>
<key>Evaluate Expressions in Content</key>
<value>false</value>
</entry>
<entry>
<key>Filename</key>
<value>station1_sensor2.csv</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>30 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFileWithContent</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type>
</processors>
<processors>
<id>d6abbdf3-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>4.0</x>
<y>391.99999618530273</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Line Split Count</key>
<value>
<name>Line Split Count</name>
</value>
</entry>
<entry>
<key>Maximum Fragment Size</key>
<value>
<name>Maximum Fragment Size</name>
</value>
</entry>
<entry>
<key>Header Line Count</key>
<value>
<name>Header Line Count</name>
</value>
</entry>
<entry>
<key>Header Line Marker Characters</key>
<value>
<name>Header Line Marker Characters</name>
</value>
</entry>
<entry>
<key>Remove Trailing Newlines</key>
<value>
<name>Remove Trailing Newlines</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Line Split Count</key>
<value>1</value>
</entry>
<entry>
<key>Maximum Fragment Size</key>
</entry>
<entry>
<key>Header Line Count</key>
<value>0</value>
</entry>
<entry>
<key>Header Line Marker Characters</key>
</entry>
<entry>
<key>Remove Trailing Newlines</key>
<value>true</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Split CSV Lines</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>splits</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.SplitText</type>
</processors>
<processors>
<id>d6ad3cb7-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>1.0</x>
<y>612.9999961853027</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Maximum Capture Group Length</key>
<value>
<name>Maximum Capture Group Length</name>
</value>
</entry>
<entry>
<key>Enable Canonical Equivalence</key>
<value>
<name>Enable Canonical Equivalence</name>
</value>
</entry>
<entry>
<key>Enable Case-insensitive Matching</key>
<value>
<name>Enable Case-insensitive Matching</name>
</value>
</entry>
<entry>
<key>Permit Whitespace and Comments in Pattern</key>
<value>
<name>Permit Whitespace and Comments in Pattern</name>
</value>
</entry>
<entry>
<key>Enable DOTALL Mode</key>
<value>
<name>Enable DOTALL Mode</name>
</value>
</entry>
<entry>
<key>Enable Literal Parsing of the Pattern</key>
<value>
<name>Enable Literal Parsing of the Pattern</name>
</value>
</entry>
<entry>
<key>Enable Multiline Mode</key>
<value>
<name>Enable Multiline Mode</name>
</value>
</entry>
<entry>
<key>Enable Unicode-aware Case Folding</key>
<value>
<name>Enable Unicode-aware Case Folding</name>
</value>
</entry>
<entry>
<key>Enable Unicode Predefined Character Classes</key>
<value>
<name>Enable Unicode Predefined Character Classes</name>
</value>
</entry>
<entry>
<key>Enable Unix Lines Mode</key>
<value>
<name>Enable Unix Lines Mode</name>
</value>
</entry>
<entry>
<key>Include Capture Group 0</key>
<value>
<name>Include Capture Group 0</name>
</value>
</entry>
<entry>
<key>column</key>
<value>
<name>column</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Maximum Capture Group Length</key>
<value>1024</value>
</entry>
<entry>
<key>Enable Canonical Equivalence</key>
<value>false</value>
</entry>
<entry>
<key>Enable Case-insensitive Matching</key>
<value>false</value>
</entry>
<entry>
<key>Permit Whitespace and Comments in Pattern</key>
<value>false</value>
</entry>
<entry>
<key>Enable DOTALL Mode</key>
<value>false</value>
</entry>
<entry>
<key>Enable Literal Parsing of the Pattern</key>
<value>false</value>
</entry>
<entry>
<key>Enable Multiline Mode</key>
<value>false</value>
</entry>
<entry>
<key>Enable Unicode-aware Case Folding</key>
<value>false</value>
</entry>
<entry>
<key>Enable Unicode Predefined Character Classes</key>
<value>false</value>
</entry>
<entry>
<key>Enable Unix Lines Mode</key>
<value>false</value>
</entry>
<entry>
<key>Include Capture Group 0</key>
<value>true</value>
</entry>
<entry>
<key>column</key>
<value>([^;]*?);([^;]*?);([^;]*?)</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Extract Column Values</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>matched</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>unmatched</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ExtractText</type>
</processors>
<processors>
<id>d6ae76e5-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>4.0</x>
<y>203.0000114440918</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>sensor.name</key>
<value>
<name>sensor.name</name>
</value>
</entry>
<entry>
<key>station.name</key>
<value>
<name>station.name</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>sensor.name</key>
<value>${filename:substringAfter('_'):substringBefore('.csv')}</value>
</entry>
<entry>
<key>station.name</key>
<value>${filename:substringBefore('_')}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Parse Filename</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>d6b4e56f-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>656.0</x>
<y>0.0</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>tps</key>
<value>
<name>tps</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>tps</key>
<value>${column.1:toDate('yyyy-MM-dd HH:mm:ss.SSSSSSZ'):format('yyyy-MM-dd HH:mm:ss.SSSZ')}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Convert Timestamp</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>d6be429d-0156-1000-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>652.9999999999999</x>
<y>245.99999618530273</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s:^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Regex Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ReplaceText</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>d794ca20-98c5-4fc5-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>503.4999999999999</x>
<y>687.7600114440918</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<value>DDL =
"""CREATE KEYSPACE IF NOT EXISTS data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
DROP TABLE IF EXISTS sensor_data;
CREATE TABLE sensor_data (station_id text, sensor_id text, tps timestamp, val float, PRIMARY KEY ((station_id, sensor_id), tps));
"""
DDL.eachLine { ddl -&gt;
flowFile = session.create()
flowFile = session.write(flowFile, { outStream -&gt;
outStream.write(ddl.getBytes('UTF-8'))
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>60 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ExecuteScript</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#000000</value>
</entry>
</style>
<type>org.apache.nifi.processors.script.ExecuteScript</type>
</processors>
<processors>
<id>7969ae1c-8754-4f49-0000-000000000000</id>
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId>
<position>
<x>654.9999999999999</x>
<y>472.73999618530274</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cassandra Contact Points</key>
<value>
<name>Cassandra Contact Points</name>
</value>
</entry>
<entry>
<key>Keyspace</key>
<value>
<name>Keyspace</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Client Auth</key>
<value>
<name>Client Auth</name>
</value>
</entry>
<entry>
<key>Username</key>
<value>
<name>Username</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Consistency Level</key>
<value>
<name>Consistency Level</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cassandra Contact Points</key>
<value>127.0.0.1:9042</value>
</entry>
<entry>
<key>Keyspace</key>
<value>data</value>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Client Auth</key>
<value>REQUIRED</value>
</entry>
<entry>
<key>Username</key>
<value>cassandra</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Consistency Level</key>
<value>ONE</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>0 seconds</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutCassandraQL</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#6febb9</value>
</entry>
</style>
<type>org.apache.nifi.processors.cassandra.PutCassandraQL</type>
</processors>
</snippet>
<timestamp>08/29/2016 11:39:23 EDT</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment