Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ijokarumawak/45a609936239a0cdfc4599a576386d42 to your computer and use it in GitHub Desktop.
Save ijokarumawak/45a609936239a0cdfc4599a576386d42 to your computer and use it in GitHub Desktop.
NiFi example: Calcurate Min, Max and Avg in a CSV file

NiFi example: Calcurate Min, Max and Avg in a CSV file

Let's calcurate Min, Max and Average value from this CSV file:

id, value
a, 345
b, 234
c, 1256

Using QueryRecord, we can analyze records by SQL! (QueryRecord uses Apache Culcite)

select
 min("value") as "min",
 max("value") as "max",
 avg("value") as "avg"
from FLOWFILE

Result:

min,max,avg
234,1256,611
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>1e8778a8-015c-1000-c747-8273276bab45</groupId>
<name>QueryMinMax</name>
<snippet>
<processGroups>
<id>5364fc49-9be0-3f9a-0000-000000000000</id>
<parentGroupId>5454b790-94fe-3e28-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>f1b58695-2ceb-3dc9-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>5364fc49-9be0-3f9a-0000-000000000000</groupId>
<id>6a4a24f4-bdee-3ea0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>original</selectedRelationships>
<source>
<groupId>5364fc49-9be0-3f9a-0000-000000000000</groupId>
<id>80614c49-d536-3ed2-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>42e891a9-457d-39c0-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>776.9999957679274</x>
<y>509.49999880962423</y>
</bends>
<destination>
<groupId>5364fc49-9be0-3f9a-0000-000000000000</groupId>
<id>6a4a24f4-bdee-3ea0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>summary</selectedRelationships>
<source>
<groupId>5364fc49-9be0-3f9a-0000-000000000000</groupId>
<id>80614c49-d536-3ed2-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>550bbc34-8bfb-39fa-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>5364fc49-9be0-3f9a-0000-000000000000</groupId>
<id>80614c49-d536-3ed2-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>5364fc49-9be0-3f9a-0000-000000000000</groupId>
<id>b8fa189b-163c-3391-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>7d4a8631-388b-35ff-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>376.99999576792743</x>
<y>509.49999880962423</y>
</bends>
<destination>
<groupId>5364fc49-9be0-3f9a-0000-000000000000</groupId>
<id>6a4a24f4-bdee-3ea0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>5364fc49-9be0-3f9a-0000-000000000000</groupId>
<id>80614c49-d536-3ed2-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>c13c0758-23b1-3255-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-registry-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>summary</key>
<value>
<name>summary</name>
</value>
</entry>
</descriptors>
<name>AvroSchemaRegistry</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>summary</key>
<value>{
"type": "record",
"name": "summary",
"fields": [
{"name": "min", "type": "int"},
{"name": "max", "type": "int"},
{"name": "avg", "type": "int"}
]
}</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
</controllerServices>
<controllerServices>
<id>1570cdb3-d196-3ffb-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Schema Write Strategy</key>
<value>
<name>Schema Write Strategy</name>
</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Include Header Line</key>
<value>
<name>Include Header Line</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
<entry>
<key>Quote Mode</key>
<value>
<name>Quote Mode</name>
</value>
</entry>
<entry>
<key>Record Separator</key>
<value>
<name>Record Separator</name>
</value>
</entry>
<entry>
<key>Include Trailing Delimiter</key>
<value>
<name>Include Trailing Delimiter</name>
</value>
</entry>
</descriptors>
<name>CSVRecordSetWriter</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Schema Write Strategy</key>
</entry>
<entry>
<key>schema-access-strategy</key>
</entry>
<entry>
<key>schema-registry</key>
<value>c13c0758-23b1-3255-0000-000000000000</value>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-text</key>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>CSV Format</key>
</entry>
<entry>
<key>Value Separator</key>
</entry>
<entry>
<key>Include Header Line</key>
</entry>
<entry>
<key>Quote Character</key>
</entry>
<entry>
<key>Escape Character</key>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
</entry>
<entry>
<key>Quote Mode</key>
</entry>
<entry>
<key>Record Separator</key>
</entry>
<entry>
<key>Include Trailing Delimiter</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVRecordSetWriter</type>
</controllerServices>
<controllerServices>
<id>5b002675-3659-3d57-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Skip Header Line</key>
<value>
<name>Skip Header Line</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
</descriptors>
<name>CSVReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>schema-text-property</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-text</key>
<value>{
"type": "record",
"name": "summary",
"fields": [
{"name": "id", "type": "string"},
{"name": "value", "type": "int"}
]
}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>CSV Format</key>
</entry>
<entry>
<key>Value Separator</key>
</entry>
<entry>
<key>Skip Header Line</key>
<value>true</value>
</entry>
<entry>
<key>Quote Character</key>
</entry>
<entry>
<key>Escape Character</key>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVReader</type>
</controllerServices>
<processors>
<id>80614c49-d536-3ed2-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<position>
<x>392.99999576792743</x>
<y>322.9999988096242</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>record-reader</name>
</value>
</entry>
<entry>
<key>record-writer</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
<name>record-writer</name>
</value>
</entry>
<entry>
<key>include-zero-record-flowfiles</key>
<value>
<name>include-zero-record-flowfiles</name>
</value>
</entry>
<entry>
<key>cache-schema</key>
<value>
<name>cache-schema</name>
</value>
</entry>
<entry>
<key>summary</key>
<value>
<name>summary</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>record-reader</key>
<value>5b002675-3659-3d57-0000-000000000000</value>
</entry>
<entry>
<key>record-writer</key>
<value>1570cdb3-d196-3ffb-0000-000000000000</value>
</entry>
<entry>
<key>include-zero-record-flowfiles</key>
<value>true</value>
</entry>
<entry>
<key>cache-schema</key>
<value>true</value>
</entry>
<entry>
<key>summary</key>
<value>select
min("value") as "min",
max("value") as "max",
avg("value") as "avg"
from FLOWFILE</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>QueryRecord</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>summary</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.QueryRecord</type>
</processors>
<processors>
<id>b8fa189b-163c-3391-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<position>
<x>391.99999576792743</x>
<y>88.99999880962417</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>schema.name</key>
<value>
<name>schema.name</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>id, value
a, 345
b, 234
c, 1256</value>
</entry>
<entry>
<key>schema.name</key>
<value>summary</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>6a4a24f4-bdee-3ea0-0000-000000000000</id>
<parentGroupId>5364fc49-9be0-3f9a-0000-000000000000</parentGroupId>
<position>
<x>410.99999576792743</x>
<y>565.9999988096242</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
</contents>
<name>QueryMinMax</name>
</processGroups>
</snippet>
<timestamp>06/07/2017 11:08:57 JST</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment