Skip to content

Instantly share code, notes, and snippets.

Created August 29, 2016 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattyb149/f9bd1a8598eaf13db50f9cd73b8a3e06 to your computer and use it in GitHub Desktop.
Save mattyb149/f9bd1a8598eaf13db50f9cd73b8a3e06 to your computer and use it in GitHub Desktop.
Apache NiFi 1.0 template to convert CSV files to Cassandra Query Language (SQL) statements and execute them
<?xml version="1.0" ?>
<template encoding-version="1.0">
<description>This template describes a flow where a CSV file (whose filename and content) contributes to the fields in a Cassandra table is processed, then CQL statements are constructed and executed.</description>
<backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold>
<flowFileExpiration>0 sec</flowFileExpiration>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<flowFileExpiration>0 sec</flowFileExpiration>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<flowFileExpiration>0 sec</flowFileExpiration>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<flowFileExpiration>0 sec</flowFileExpiration>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<flowFileExpiration>0 sec</flowFileExpiration>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<flowFileExpiration>0 sec</flowFileExpiration>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<flowFileExpiration>0 sec</flowFileExpiration>
<label>Creates the keyspace, drops the table,
and creates the table. This is a helper
for the example flow, should only be run
once before the rest of the flow, and can
be removed if the keyspace and table exist.
<label>Generates a test CSV file with content.
<label>The filename is station1_sensor2.csv,
these segments are parsed to get
fields for the CQL INSERTs.
<label>This attempts to strip the microseconds
from the timestamp, but ends up truncating
all fractional seconds.
<label>Builds a CQL INSERT statement with
explicit values. Also could use UpdateAttribute
in order to use prepared statements.
<label>Executes the CQL statements
<label>Uses regular expressions to
group the columns
<key>Script Engine</key>
<name>Script Engine</name>
<key>Script File</key>
<name>Script File</name>
<key>Script Body</key>
<name>Script Body</name>
<key>Module Directory</key>
<name>Module Directory</name>
<key>File Content</key>
<name>File Content</name>
<key>Evaluate Expressions in Content</key>
<name>Evaluate Expressions in Content</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Script Engine</key>
<key>Script File</key>
<key>Script Body</key>
<value>class GenerateFlowFileWithContent implements Processor {
def REL_SUCCESS = new Relationship.Builder()
.description('The flow file with the specified content and/or filename was successfully transferred')
def CONTENT = new PropertyDescriptor.Builder()
.name('File Content').description('The content for the generated flow file')
def CONTENT_HAS_EL = new PropertyDescriptor.Builder()
.name('Evaluate Expressions in Content').description('Whether to evaluate NiFi Expression Language constructs within the content')
def FILENAME = new PropertyDescriptor.Builder()
.name('Filename').description('The name of the flow file to be stored in the filename attribute')
void initialize(ProcessorInitializationContext context) { }
Set&lt;Relationship&gt; getRelationships() { return [REL_SUCCESS] as Set }
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.create()
def hasEL = context.getProperty(CONTENT_HAS_EL).asBoolean()
def contentProp = context.getProperty(CONTENT)
def content = (hasEL ? contentProp.evaluateAttributeExpressions().value : contentProp.value) ?: ''
def filename = context.getProperty(FILENAME)?.evaluateAttributeExpressions()?.getValue()
flowFile = session.write(flowFile, { outStream -&gt;
} as OutputStreamCallback)
if(filename != null) { flowFile = session.putAttribute(flowFile, 'filename', filename) }
// transfer
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
throw new ProcessException(e)
Collection&lt;ValidationResult&gt; validate(ValidationContext context) { return null }
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'File Content': return CONTENT
case 'Evaluate Expressions in Content': return CONTENT_HAS_EL
case 'Filename': return FILENAME
default: return null
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
List&lt;PropertyDescriptor&gt; getPropertyDescriptors() { return [CONTENT, CONTENT_HAS_EL, FILENAME] as List }
String getIdentifier() { return 'GenerateFlowFile-InvokeScriptedProcessor' }
processor = new GenerateFlowFileWithContent()</value>
<key>Module Directory</key>
<key>File Content</key>
<value>2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;</value>
<key>Evaluate Expressions in Content</key>
<schedulingPeriod>30 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<key>Line Split Count</key>
<name>Line Split Count</name>
<key>Maximum Fragment Size</key>
<name>Maximum Fragment Size</name>
<key>Header Line Count</key>
<name>Header Line Count</name>
<key>Header Line Marker Characters</key>
<name>Header Line Marker Characters</name>
<key>Remove Trailing Newlines</key>
<name>Remove Trailing Newlines</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Line Split Count</key>
<key>Maximum Fragment Size</key>
<key>Header Line Count</key>
<key>Header Line Marker Characters</key>
<key>Remove Trailing Newlines</key>
<schedulingPeriod>0 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<name>Split CSV Lines</name>
<key>Character Set</key>
<name>Character Set</name>
<key>Maximum Buffer Size</key>
<name>Maximum Buffer Size</name>
<key>Maximum Capture Group Length</key>
<name>Maximum Capture Group Length</name>
<key>Enable Canonical Equivalence</key>
<name>Enable Canonical Equivalence</name>
<key>Enable Case-insensitive Matching</key>
<name>Enable Case-insensitive Matching</name>
<key>Permit Whitespace and Comments in Pattern</key>
<name>Permit Whitespace and Comments in Pattern</name>
<key>Enable DOTALL Mode</key>
<name>Enable DOTALL Mode</name>
<key>Enable Literal Parsing of the Pattern</key>
<name>Enable Literal Parsing of the Pattern</name>
<key>Enable Multiline Mode</key>
<name>Enable Multiline Mode</name>
<key>Enable Unicode-aware Case Folding</key>
<name>Enable Unicode-aware Case Folding</name>
<key>Enable Unicode Predefined Character Classes</key>
<name>Enable Unicode Predefined Character Classes</name>
<key>Enable Unix Lines Mode</key>
<name>Enable Unix Lines Mode</name>
<key>Include Capture Group 0</key>
<name>Include Capture Group 0</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Character Set</key>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
<key>Maximum Capture Group Length</key>
<key>Enable Canonical Equivalence</key>
<key>Enable Case-insensitive Matching</key>
<key>Permit Whitespace and Comments in Pattern</key>
<key>Enable DOTALL Mode</key>
<key>Enable Literal Parsing of the Pattern</key>
<key>Enable Multiline Mode</key>
<key>Enable Unicode-aware Case Folding</key>
<key>Enable Unicode Predefined Character Classes</key>
<key>Enable Unix Lines Mode</key>
<key>Include Capture Group 0</key>
<schedulingPeriod>0 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<name>Extract Column Values</name>
<key>Delete Attributes Expression</key>
<name>Delete Attributes Expression</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Delete Attributes Expression</key>
<schedulingPeriod>0 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<name>Parse Filename</name>
<key>Delete Attributes Expression</key>
<name>Delete Attributes Expression</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Delete Attributes Expression</key>
<value>${column.1:toDate('yyyy-MM-dd HH:mm:ss.SSSSSSZ'):format('yyyy-MM-dd HH:mm:ss.SSSZ')}</value>
<schedulingPeriod>0 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<name>Convert Timestamp</name>
<key>Regular Expression</key>
<name>Regular Expression</name>
<key>Replacement Value</key>
<name>Replacement Value</name>
<key>Character Set</key>
<name>Character Set</name>
<key>Maximum Buffer Size</key>
<name>Maximum Buffer Size</name>
<key>Replacement Strategy</key>
<name>Replacement Strategy</name>
<key>Evaluation Mode</key>
<name>Evaluation Mode</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Regular Expression</key>
<key>Replacement Value</key>
<value>insert into sensor_data (station_id, sensor_id, tps, val) values ('${}', '${}', '${tps}', ${column.2})</value>
<key>Character Set</key>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
<key>Replacement Strategy</key>
<value>Regex Replace</value>
<key>Evaluation Mode</key>
<value>Entire text</value>
<schedulingPeriod>0 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<key>Script Engine</key>
<name>Script Engine</name>
<key>Script File</key>
<name>Script File</name>
<key>Script Body</key>
<name>Script Body</name>
<key>Module Directory</key>
<name>Module Directory</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Script Engine</key>
<key>Script File</key>
<key>Script Body</key>
<value>DDL =
"""CREATE KEYSPACE IF NOT EXISTS data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
CREATE TABLE sensor_data (station_id text, sensor_id text, tps timestamp, val float, PRIMARY KEY ((station_id, sensor_id), tps));
DDL.eachLine { ddl -&gt;
flowFile = session.create()
flowFile = session.write(flowFile, { outStream -&gt;
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
<key>Module Directory</key>
<schedulingPeriod>60 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<key>Cassandra Contact Points</key>
<name>Cassandra Contact Points</name>
<key>SSL Context Service</key>
<name>SSL Context Service</name>
<key>Client Auth</key>
<name>Client Auth</name>
<key>Consistency Level</key>
<name>Consistency Level</name>
<key>Character Set</key>
<name>Character Set</name>
<key>Max Wait Time</key>
<name>Max Wait Time</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Cassandra Contact Points</key>
<key>SSL Context Service</key>
<key>Client Auth</key>
<key>Consistency Level</key>
<key>Character Set</key>
<key>Max Wait Time</key>
<value>0 seconds</value>
<schedulingPeriod>0 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<timestamp>08/29/2016 11:39:23 EDT</timestamp>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment