Created
August 29, 2016 15:40
-
-
Save mattyb149/f9bd1a8598eaf13db50f9cd73b8a3e06 to your computer and use it in GitHub Desktop.
Apache NiFi 1.0 template to convert CSV files to Cassandra Query Language (SQL) statements and execute them
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" ?> | |
<template encoding-version="1.0"> | |
<description>This template describes a flow where a CSV file (whose filename and content) contributes to the fields in a Cassandra table is processed, then CQL statements are constructed and executed.</description> | |
<groupId>d6aa94a0-0156-1000-71a4-a96b6da4672f</groupId> | |
<name>ConvertCSVtoCQL</name> | |
<snippet> | |
<connections> | |
<id>b7b54e60-d92f-4bb1-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold> | |
<backPressureObjectThreshold>0</backPressureObjectThreshold> | |
<destination> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>7969ae1c-8754-4f49-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</destination> | |
<flowFileExpiration>0 sec</flowFileExpiration> | |
<labelIndex>1</labelIndex> | |
<name></name> | |
<selectedRelationships>success</selectedRelationships> | |
<source> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d794ca20-98c5-4fc5-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</source> | |
<zIndex>0</zIndex> | |
</connections> | |
<connections> | |
<id>d6ad6062-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> | |
<backPressureObjectThreshold>10000</backPressureObjectThreshold> | |
<destination> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6ad3cb7-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</destination> | |
<flowFileExpiration>0 sec</flowFileExpiration> | |
<labelIndex>1</labelIndex> | |
<name></name> | |
<selectedRelationships>splits</selectedRelationships> | |
<source> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6abbdf3-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</source> | |
<zIndex>0</zIndex> | |
</connections> | |
<connections> | |
<id>d6aea457-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> | |
<backPressureObjectThreshold>10000</backPressureObjectThreshold> | |
<destination> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6ae76e5-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</destination> | |
<flowFileExpiration>0 sec</flowFileExpiration> | |
<labelIndex>1</labelIndex> | |
<name></name> | |
<selectedRelationships>success</selectedRelationships> | |
<source> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>8e949039-0779-458d-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</source> | |
<zIndex>0</zIndex> | |
</connections> | |
<connections> | |
<id>d6aebdbc-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> | |
<backPressureObjectThreshold>10000</backPressureObjectThreshold> | |
<destination> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6abbdf3-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</destination> | |
<flowFileExpiration>0 sec</flowFileExpiration> | |
<labelIndex>1</labelIndex> | |
<name></name> | |
<selectedRelationships>success</selectedRelationships> | |
<source> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6ae76e5-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</source> | |
<zIndex>0</zIndex> | |
</connections> | |
<connections> | |
<id>d6b4f5b7-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> | |
<backPressureObjectThreshold>10000</backPressureObjectThreshold> | |
<destination> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6b4e56f-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</destination> | |
<flowFileExpiration>0 sec</flowFileExpiration> | |
<labelIndex>1</labelIndex> | |
<name></name> | |
<selectedRelationships>matched</selectedRelationships> | |
<source> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6ad3cb7-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</source> | |
<zIndex>0</zIndex> | |
</connections> | |
<connections> | |
<id>d6be6cc3-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> | |
<backPressureObjectThreshold>10000</backPressureObjectThreshold> | |
<destination> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6be429d-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</destination> | |
<flowFileExpiration>0 sec</flowFileExpiration> | |
<labelIndex>1</labelIndex> | |
<name></name> | |
<selectedRelationships>success</selectedRelationships> | |
<source> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6b4e56f-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</source> | |
<zIndex>0</zIndex> | |
</connections> | |
<connections> | |
<id>d6d242d1-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> | |
<backPressureObjectThreshold>10000</backPressureObjectThreshold> | |
<destination> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>7969ae1c-8754-4f49-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</destination> | |
<flowFileExpiration>0 sec</flowFileExpiration> | |
<labelIndex>1</labelIndex> | |
<name></name> | |
<selectedRelationships>success</selectedRelationships> | |
<source> | |
<groupId>d6aa94a0-0156-1000-0000-000000000000</groupId> | |
<id>d6be429d-0156-1000-0000-000000000000</id> | |
<type>PROCESSOR</type> | |
</source> | |
<zIndex>0</zIndex> | |
</connections> | |
<labels> | |
<id>d6e8d098-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>885.9999917116437</x> | |
<y>707.1400005203105</y> | |
</position> | |
<height>94.0</height> | |
<label>Creates the keyspace, drops the table, | |
and creates the table. This is a helper | |
for the example flow, should only be run | |
once before the rest of the flow, and can | |
be removed if the keyspace and table exist. | |
<--</label> | |
<style> | |
<entry> | |
<key>font-size</key> | |
<value>12px</value> | |
</entry> | |
</style> | |
<width>246.99996948242188</width> | |
</labels> | |
<labels> | |
<id>d6e9804a-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>371.9999917116436</x> | |
<y>25.140004335007745</y> | |
</position> | |
<height>56.999996185302734</height> | |
<label>Generates a test CSV file with content. | |
<--</label> | |
<style> | |
<entry> | |
<key>font-size</key> | |
<value>12px</value> | |
</entry> | |
</style> | |
<width>217.99996948242188</width> | |
</labels> | |
<labels> | |
<id>d6eadf91-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>368.9999917116436</x> | |
<y>228.14000052031048</y> | |
</position> | |
<height>67.0</height> | |
<label>The filename is station1_sensor2.csv, | |
these segments are parsed to get | |
fields for the CQL INSERTs. | |
<--</label> | |
<style> | |
<entry> | |
<key>font-size</key> | |
<value>12px</value> | |
</entry> | |
</style> | |
<width>216.99996948242188</width> | |
</labels> | |
<labels> | |
<id>d6ec9645-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>1036.9999917116438</x> | |
<y>24.140004335007745</y> | |
</position> | |
<height>61.0</height> | |
<label>This attempts to strip the microseconds | |
from the timestamp, but ends up truncating | |
all fractional seconds. | |
<--</label> | |
<style> | |
<entry> | |
<key>font-size</key> | |
<value>12px</value> | |
</entry> | |
</style> | |
<width>243.99996948242188</width> | |
</labels> | |
<labels> | |
<id>d6ed7b8f-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>1034.9999917116438</x> | |
<y>269.1400005203105</y> | |
</position> | |
<height>73.0</height> | |
<label>Builds a CQL INSERT statement with | |
explicit values. Also could use UpdateAttribute | |
in order to use prepared statements. | |
<--</label> | |
<style> | |
<entry> | |
<key>font-size</key> | |
<value>12px</value> | |
</entry> | |
</style> | |
<width>271.9999694824219</width> | |
</labels> | |
<labels> | |
<id>d6ef37d2-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>1039.9999917116438</x> | |
<y>511.14000052031054</y> | |
</position> | |
<height>48.000003814697266</height> | |
<label>Executes the CQL statements | |
(INSERT or DDL) | |
<--</label> | |
<style> | |
<entry> | |
<key>font-size</key> | |
<value>12px</value> | |
</entry> | |
</style> | |
<width>174.99996948242188</width> | |
</labels> | |
<labels> | |
<id>d6f0d22f-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>365.9999917116436</x> | |
<y>619.1400005203105</y> | |
</position> | |
<height>48.000003814697266</height> | |
<label>Uses regular expressions to | |
group the columns | |
<--</label> | |
<style> | |
<entry> | |
<key>font-size</key> | |
<value>12px</value> | |
</entry> | |
</style> | |
<width>172.99996948242188</width> | |
</labels> | |
<processors> | |
<id>8e949039-0779-458d-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>0.0</x> | |
<y>1.0000114440917969</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Script Engine</key> | |
<value> | |
<name>Script Engine</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Script File</key> | |
<value> | |
<name>Script File</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Script Body</key> | |
<value> | |
<name>Script Body</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Module Directory</key> | |
<value> | |
<name>Module Directory</name> | |
</value> | |
</entry> | |
<entry> | |
<key>File Content</key> | |
<value> | |
<name>File Content</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Evaluate Expressions in Content</key> | |
<value> | |
<name>Evaluate Expressions in Content</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Filename</key> | |
<value> | |
<name>Filename</name> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Script Engine</key> | |
<value>Groovy</value> | |
</entry> | |
<entry> | |
<key>Script File</key> | |
</entry> | |
<entry> | |
<key>Script Body</key> | |
<value>class GenerateFlowFileWithContent implements Processor { | |
def REL_SUCCESS = new Relationship.Builder() | |
.name('success') | |
.description('The flow file with the specified content and/or filename was successfully transferred') | |
.build(); | |
def CONTENT = new PropertyDescriptor.Builder() | |
.name('File Content').description('The content for the generated flow file') | |
.required(false).expressionLanguageSupported(true).addValidator(Validator.VALID).build() | |
def CONTENT_HAS_EL = new PropertyDescriptor.Builder() | |
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content') | |
.required(true).allowableValues('true','false').defaultValue('false').build() | |
def FILENAME = new PropertyDescriptor.Builder() | |
.name('Filename').description('The name of the flow file to be stored in the filename attribute') | |
.required(false).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build() | |
@Override | |
void initialize(ProcessorInitializationContext context) { } | |
@Override | |
Set<Relationship> getRelationships() { return [REL_SUCCESS] as Set } | |
@Override | |
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { | |
try { | |
def session = sessionFactory.createSession() | |
def flowFile = session.create() | |
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean() | |
def contentProp = context.getProperty(CONTENT) | |
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: '' | |
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue() | |
flowFile = session.write(flowFile, { outStream -> | |
outStream.write(content.getBytes("UTF-8")) | |
} as OutputStreamCallback) | |
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) } | |
// transfer | |
session.transfer(flowFile, REL_SUCCESS) | |
session.commit() | |
} catch(e) { | |
throw new ProcessException(e) | |
} | |
} | |
@Override | |
Collection<ValidationResult> validate(ValidationContext context) { return null } | |
@Override | |
PropertyDescriptor getPropertyDescriptor(String name) { | |
switch(name) { | |
case 'File Content': return CONTENT | |
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL | |
case 'Filename': return FILENAME | |
default: return null | |
} | |
} | |
@Override | |
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { } | |
@Override | |
List<PropertyDescriptor> getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List } | |
@Override | |
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' } | |
} | |
processor = new GenerateFlowFileWithContent()</value> | |
</entry> | |
<entry> | |
<key>Module Directory</key> | |
</entry> | |
<entry> | |
<key>File Content</key> | |
<value>2016-05-04 03:02:01.001000+0000;0; | |
2016-05-04 03:02:01.002000+0000;0.1234; | |
2016-05-04 03:02:01.003000+0000;0.2345;</value> | |
</entry> | |
<entry> | |
<key>Evaluate Expressions in Content</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Filename</key> | |
<value>station1_sensor2.csv</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>30 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>GenerateFlowFileWithContent</name> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type> | |
</processors> | |
<processors> | |
<id>d6abbdf3-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>4.0</x> | |
<y>391.99999618530273</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Line Split Count</key> | |
<value> | |
<name>Line Split Count</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Maximum Fragment Size</key> | |
<value> | |
<name>Maximum Fragment Size</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Header Line Count</key> | |
<value> | |
<name>Header Line Count</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Header Line Marker Characters</key> | |
<value> | |
<name>Header Line Marker Characters</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Remove Trailing Newlines</key> | |
<value> | |
<name>Remove Trailing Newlines</name> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Line Split Count</key> | |
<value>1</value> | |
</entry> | |
<entry> | |
<key>Maximum Fragment Size</key> | |
</entry> | |
<entry> | |
<key>Header Line Count</key> | |
<value>0</value> | |
</entry> | |
<entry> | |
<key>Header Line Marker Characters</key> | |
</entry> | |
<entry> | |
<key>Remove Trailing Newlines</key> | |
<value>true</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>Split CSV Lines</name> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>failure</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>original</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>splits</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.standard.SplitText</type> | |
</processors> | |
<processors> | |
<id>d6ad3cb7-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>1.0</x> | |
<y>612.9999961853027</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Character Set</key> | |
<value> | |
<name>Character Set</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Maximum Buffer Size</key> | |
<value> | |
<name>Maximum Buffer Size</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Maximum Capture Group Length</key> | |
<value> | |
<name>Maximum Capture Group Length</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Enable Canonical Equivalence</key> | |
<value> | |
<name>Enable Canonical Equivalence</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Enable Case-insensitive Matching</key> | |
<value> | |
<name>Enable Case-insensitive Matching</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Permit Whitespace and Comments in Pattern</key> | |
<value> | |
<name>Permit Whitespace and Comments in Pattern</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Enable DOTALL Mode</key> | |
<value> | |
<name>Enable DOTALL Mode</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Enable Literal Parsing of the Pattern</key> | |
<value> | |
<name>Enable Literal Parsing of the Pattern</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Enable Multiline Mode</key> | |
<value> | |
<name>Enable Multiline Mode</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Enable Unicode-aware Case Folding</key> | |
<value> | |
<name>Enable Unicode-aware Case Folding</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Enable Unicode Predefined Character Classes</key> | |
<value> | |
<name>Enable Unicode Predefined Character Classes</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Enable Unix Lines Mode</key> | |
<value> | |
<name>Enable Unix Lines Mode</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Include Capture Group 0</key> | |
<value> | |
<name>Include Capture Group 0</name> | |
</value> | |
</entry> | |
<entry> | |
<key>column</key> | |
<value> | |
<name>column</name> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Character Set</key> | |
<value>UTF-8</value> | |
</entry> | |
<entry> | |
<key>Maximum Buffer Size</key> | |
<value>1 MB</value> | |
</entry> | |
<entry> | |
<key>Maximum Capture Group Length</key> | |
<value>1024</value> | |
</entry> | |
<entry> | |
<key>Enable Canonical Equivalence</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Enable Case-insensitive Matching</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Permit Whitespace and Comments in Pattern</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Enable DOTALL Mode</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Enable Literal Parsing of the Pattern</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Enable Multiline Mode</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Enable Unicode-aware Case Folding</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Enable Unicode Predefined Character Classes</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Enable Unix Lines Mode</key> | |
<value>false</value> | |
</entry> | |
<entry> | |
<key>Include Capture Group 0</key> | |
<value>true</value> | |
</entry> | |
<entry> | |
<key>column</key> | |
<value>([^;]*?);([^;]*?);([^;]*?)</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>Extract Column Values</name> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>matched</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>unmatched</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.standard.ExtractText</type> | |
</processors> | |
<processors> | |
<id>d6ae76e5-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>4.0</x> | |
<y>203.0000114440918</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Delete Attributes Expression</key> | |
<value> | |
<name>Delete Attributes Expression</name> | |
</value> | |
</entry> | |
<entry> | |
<key>sensor.name</key> | |
<value> | |
<name>sensor.name</name> | |
</value> | |
</entry> | |
<entry> | |
<key>station.name</key> | |
<value> | |
<name>station.name</name> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Delete Attributes Expression</key> | |
</entry> | |
<entry> | |
<key>sensor.name</key> | |
<value>${filename:substringAfter('_'):substringBefore('.csv')}</value> | |
</entry> | |
<entry> | |
<key>station.name</key> | |
<value>${filename:substringBefore('_')}</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>Parse Filename</name> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type> | |
</processors> | |
<processors> | |
<id>d6b4e56f-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>656.0</x> | |
<y>0.0</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Delete Attributes Expression</key> | |
<value> | |
<name>Delete Attributes Expression</name> | |
</value> | |
</entry> | |
<entry> | |
<key>tps</key> | |
<value> | |
<name>tps</name> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Delete Attributes Expression</key> | |
</entry> | |
<entry> | |
<key>tps</key> | |
<value>${column.1:toDate('yyyy-MM-dd HH:mm:ss.SSSSSSZ'):format('yyyy-MM-dd HH:mm:ss.SSSZ')}</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>Convert Timestamp</name> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type> | |
</processors> | |
<processors> | |
<id>d6be429d-0156-1000-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>652.9999999999999</x> | |
<y>245.99999618530273</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Regular Expression</key> | |
<value> | |
<name>Regular Expression</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Replacement Value</key> | |
<value> | |
<name>Replacement Value</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Character Set</key> | |
<value> | |
<name>Character Set</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Maximum Buffer Size</key> | |
<value> | |
<name>Maximum Buffer Size</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Replacement Strategy</key> | |
<value> | |
<name>Replacement Strategy</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Evaluation Mode</key> | |
<value> | |
<name>Evaluation Mode</name> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Regular Expression</key> | |
<value>(?s:^.*$)</value> | |
</entry> | |
<entry> | |
<key>Replacement Value</key> | |
<value>insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})</value> | |
</entry> | |
<entry> | |
<key>Character Set</key> | |
<value>UTF-8</value> | |
</entry> | |
<entry> | |
<key>Maximum Buffer Size</key> | |
<value>1 MB</value> | |
</entry> | |
<entry> | |
<key>Replacement Strategy</key> | |
<value>Regex Replace</value> | |
</entry> | |
<entry> | |
<key>Evaluation Mode</key> | |
<value>Entire text</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>ReplaceText</name> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>failure</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style></style> | |
<type>org.apache.nifi.processors.standard.ReplaceText</type> | |
</processors> | |
<processors> | |
<id>d794ca20-98c5-4fc5-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>503.4999999999999</x> | |
<y>687.7600114440918</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Script Engine</key> | |
<value> | |
<name>Script Engine</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Script File</key> | |
<value> | |
<name>Script File</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Script Body</key> | |
<value> | |
<name>Script Body</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Module Directory</key> | |
<value> | |
<name>Module Directory</name> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Script Engine</key> | |
<value>Groovy</value> | |
</entry> | |
<entry> | |
<key>Script File</key> | |
</entry> | |
<entry> | |
<key>Script Body</key> | |
<value>DDL = | |
"""CREATE KEYSPACE IF NOT EXISTS data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3}; | |
DROP TABLE IF EXISTS sensor_data; | |
CREATE TABLE sensor_data (station_id text, sensor_id text, tps timestamp, val float, PRIMARY KEY ((station_id, sensor_id), tps)); | |
""" | |
DDL.eachLine { ddl -> | |
flowFile = session.create() | |
flowFile = session.write(flowFile, { outStream -> | |
outStream.write(ddl.getBytes('UTF-8')) | |
} as OutputStreamCallback) | |
session.transfer(flowFile, REL_SUCCESS) | |
}</value> | |
</entry> | |
<entry> | |
<key>Module Directory</key> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>60 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>ExecuteScript</name> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>failure</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>false</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style> | |
<entry> | |
<key>background-color</key> | |
<value>#000000</value> | |
</entry> | |
</style> | |
<type>org.apache.nifi.processors.script.ExecuteScript</type> | |
</processors> | |
<processors> | |
<id>7969ae1c-8754-4f49-0000-000000000000</id> | |
<parentGroupId>d6aa94a0-0156-1000-0000-000000000000</parentGroupId> | |
<position> | |
<x>654.9999999999999</x> | |
<y>472.73999618530274</y> | |
</position> | |
<config> | |
<bulletinLevel>WARN</bulletinLevel> | |
<comments></comments> | |
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> | |
<descriptors> | |
<entry> | |
<key>Cassandra Contact Points</key> | |
<value> | |
<name>Cassandra Contact Points</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Keyspace</key> | |
<value> | |
<name>Keyspace</name> | |
</value> | |
</entry> | |
<entry> | |
<key>SSL Context Service</key> | |
<value> | |
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService> | |
<name>SSL Context Service</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Client Auth</key> | |
<value> | |
<name>Client Auth</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Username</key> | |
<value> | |
<name>Username</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Password</key> | |
<value> | |
<name>Password</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Consistency Level</key> | |
<value> | |
<name>Consistency Level</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Character Set</key> | |
<value> | |
<name>Character Set</name> | |
</value> | |
</entry> | |
<entry> | |
<key>Max Wait Time</key> | |
<value> | |
<name>Max Wait Time</name> | |
</value> | |
</entry> | |
</descriptors> | |
<lossTolerant>false</lossTolerant> | |
<penaltyDuration>30 sec</penaltyDuration> | |
<properties> | |
<entry> | |
<key>Cassandra Contact Points</key> | |
<value>127.0.0.1:9042</value> | |
</entry> | |
<entry> | |
<key>Keyspace</key> | |
<value>data</value> | |
</entry> | |
<entry> | |
<key>SSL Context Service</key> | |
</entry> | |
<entry> | |
<key>Client Auth</key> | |
<value>REQUIRED</value> | |
</entry> | |
<entry> | |
<key>Username</key> | |
<value>cassandra</value> | |
</entry> | |
<entry> | |
<key>Password</key> | |
</entry> | |
<entry> | |
<key>Consistency Level</key> | |
<value>ONE</value> | |
</entry> | |
<entry> | |
<key>Character Set</key> | |
<value>UTF-8</value> | |
</entry> | |
<entry> | |
<key>Max Wait Time</key> | |
<value>0 seconds</value> | |
</entry> | |
</properties> | |
<runDurationMillis>0</runDurationMillis> | |
<schedulingPeriod>0 sec</schedulingPeriod> | |
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> | |
<yieldDuration>1 sec</yieldDuration> | |
</config> | |
<name>PutCassandraQL</name> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>failure</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>retry</name> | |
</relationships> | |
<relationships> | |
<autoTerminate>true</autoTerminate> | |
<name>success</name> | |
</relationships> | |
<style> | |
<entry> | |
<key>background-color</key> | |
<value>#6febb9</value> | |
</entry> | |
</style> | |
<type>org.apache.nifi.processors.cassandra.PutCassandraQL</type> | |
</processors> | |
</snippet> | |
<timestamp>08/29/2016 11:39:23 EDT</timestamp> | |
</template> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment