Skip to content

Instantly share code, notes, and snippets.

@lostinsoftware
Created January 12, 2019 18:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lostinsoftware/01929f57091c5bda055313866d84e045 to your computer and use it in GitHub Desktop.
Save lostinsoftware/01929f57091c5bda055313866d84e045 to your computer and use it in GitHub Desktop.
Saving data in InfluxDB using Apache NiFi
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template encoding-version="1.2">
<description>Saving data in InfluxDB using Apache NiFi</description>
<groupId>2493adde-0168-1000-b6d3-8df5b44b7ea7</groupId>
<name>SavingDataInInfluxDB</name>
<snippet>
<connections>
<id>5b3de893-02ab-31c2-0000-000000000000</id>
<parentGroupId>5c0ab693-fb2e-3f91-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>5c0ab693-fb2e-3f91-0000-000000000000</groupId>
<id>b3017383-a694-3be6-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>5c0ab693-fb2e-3f91-0000-000000000000</groupId>
<id>e9f70399-93e1-3303-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>5dbb7aae-9676-3abf-0000-000000000000</id>
<parentGroupId>5c0ab693-fb2e-3f91-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>5c0ab693-fb2e-3f91-0000-000000000000</groupId>
<id>e9f70399-93e1-3303-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>5c0ab693-fb2e-3f91-0000-000000000000</groupId>
<id>8c7f19e6-2252-3dbe-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>873b718d-0d27-3ad5-0000-000000000000</id>
<parentGroupId>5c0ab693-fb2e-3f91-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>5c0ab693-fb2e-3f91-0000-000000000000</groupId>
<id>8c7f19e6-2252-3dbe-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
<loadBalancePartitionAttribute></loadBalancePartitionAttribute>
<loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus>
<loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
<name></name>
<selectedRelationships>Success</selectedRelationships>
<source>
<groupId>5c0ab693-fb2e-3f91-0000-000000000000</groupId>
<id>cedcb6d0-6731-38ef-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<processors>
<id>8c7f19e6-2252-3dbe-0000-000000000000</id>
<parentGroupId>5c0ab693-fb2e-3f91-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>290.32201729775727</y>
</position>
<bundle>
<artifact>nifi-scripting-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<value>import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream -&gt;
String result = ''
String ts = ''
inputStream.eachLine { line -&gt;
a = line.tokenize(',')
ts = a[1]
result += a[0] + "=" + a[2] + ","
}
result = result.substring(0, result.length()-1) + ' ' + ts + '000000'
outputStream.write(result.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>Conver to InfluxDB line protocol</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.script.ExecuteScript</type>
</processors>
<processors>
<id>b3017383-a694-3be6-0000-000000000000</id>
<parentGroupId>5c0ab693-fb2e-3f91-0000-000000000000</parentGroupId>
<position>
<x>569.8002392023395</x>
<y>294.2278177881103</y>
</position>
<bundle>
<artifact>nifi-influxdb-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>influxdb-dbname</key>
<value>
<name>influxdb-dbname</name>
</value>
</entry>
<entry>
<key>influxdb-url</key>
<value>
<name>influxdb-url</name>
</value>
</entry>
<entry>
<key>InfluxDB Max Connection Time Out (seconds)</key>
<value>
<name>InfluxDB Max Connection Time Out (seconds)</name>
</value>
</entry>
<entry>
<key>influxdb-username</key>
<value>
<name>influxdb-username</name>
</value>
</entry>
<entry>
<key>influxdb-password</key>
<value>
<name>influxdb-password</name>
</value>
</entry>
<entry>
<key>influxdb-charset</key>
<value>
<name>influxdb-charset</name>
</value>
</entry>
<entry>
<key>influxdb-consistency-level</key>
<value>
<name>influxdb-consistency-level</name>
</value>
</entry>
<entry>
<key>influxdb-retention-policy</key>
<value>
<name>influxdb-retention-policy</name>
</value>
</entry>
<entry>
<key>influxdb-max-records-size</key>
<value>
<name>influxdb-max-records-size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>influxdb-dbname</key>
<value>test1</value>
</entry>
<entry>
<key>influxdb-url</key>
<value>http://localhost:8086</value>
</entry>
<entry>
<key>InfluxDB Max Connection Time Out (seconds)</key>
<value>0 seconds</value>
</entry>
<entry>
<key>influxdb-username</key>
</entry>
<entry>
<key>influxdb-password</key>
</entry>
<entry>
<key>influxdb-charset</key>
<value>UTF-8</value>
</entry>
<entry>
<key>influxdb-consistency-level</key>
<value>ONE</value>
</entry>
<entry>
<key>influxdb-retention-policy</key>
<value>autogen</value>
</entry>
<entry>
<key>influxdb-max-records-size</key>
<value>1 MB</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>PutInfluxDB</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure-max-size</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.influxdb.PutInfluxDB</type>
</processors>
<processors>
<id>cedcb6d0-6731-38ef-0000-000000000000</id>
<parentGroupId>5c0ab693-fb2e-3f91-0000-000000000000</parentGroupId>
<position>
<x>0.8402386043251227</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-simulator-bundle-nar</artifact>
<group>com.hashmap</group>
<version>1.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>SIMULATOR_CONFIG</key>
<value>
<name>SIMULATOR_CONFIG</name>
</value>
</entry>
<entry>
<key>PRINT_HEADER</key>
<value>
<name>PRINT_HEADER</name>
</value>
</entry>
<entry>
<key>LONG_TIMESTAMP</key>
<value>
<name>LONG_TIMESTAMP</name>
</value>
</entry>
<entry>
<key>TIMEZONE</key>
<value>
<name>TIMEZONE</name>
</value>
</entry>
<entry>
<key>DATA_FORMAT</key>
<value>
<name>DATA_FORMAT</name>
</value>
</entry>
<entry>
<key>DEVICE_NAME</key>
<value>
<name>DEVICE_NAME</name>
</value>
</entry>
<entry>
<key>JSON_DEVICE_TYPE</key>
<value>
<name>JSON_DEVICE_TYPE</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>SIMULATOR_CONFIG</key>
<value>D:\nifi-1.8.0\config\temperature.json</value>
</entry>
<entry>
<key>PRINT_HEADER</key>
<value>false</value>
</entry>
<entry>
<key>LONG_TIMESTAMP</key>
<value>true</value>
</entry>
<entry>
<key>TIMEZONE</key>
<value>Europe/Madrid</value>
</entry>
<entry>
<key>DATA_FORMAT</key>
<value>CSV</value>
</entry>
<entry>
<key>DEVICE_NAME</key>
</entry>
<entry>
<key>JSON_DEVICE_TYPE</key>
<value>Device</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>2 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>GenerateTimeSeriesFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>Success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>com.hashmap.tempus.processors.GenerateTimeSeriesFlowFile</type>
</processors>
<processors>
<id>e9f70399-93e1-3303-0000-000000000000</id>
<parentGroupId>5c0ab693-fb2e-3f91-0000-000000000000</parentGroupId>
<position>
<x>567.9008802342521</x>
<y>18.602558411545488</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.8.0</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>${measurement},machine=${machine} $1</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Regex Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<executionNodeRestricted>false</executionNodeRestricted>
<name>ReplaceText</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style/>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
</snippet>
<timestamp>10/27/2018 19:00:06 CET</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment