Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active March 20, 2024 13:36
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save ijokarumawak/01c4fd2d9291d3e74ec424a581659ca8 to your computer and use it in GitHub Desktop.
Save ijokarumawak/01c4fd2d9291d3e74ec424a581659ca8 to your computer and use it in GitHub Desktop.
NiFi Loop Flow Example

NiFi Loop flow example

This template is analogous to the traditional for(i = 0; i < x; i++) loop in NiFi Data flow.

<?xml version="1.0" ?>
<template encoding-version="1.0">
<description>Do loop like for( i = 0; i &lt; 50; i++).</description>
<groupId>928aad39-0157-1000-4136-99f89bfed299</groupId>
<name>Loop Sample</name>
<snippet>
<processGroups>
<id>93758603-0157-1000-0000-000000000000</id>
<parentGroupId>928aad39-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>9376afcd-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>93761c82-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>9375e290-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>937a2382-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>93775fab-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>93761c82-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>937c4f2b-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>9380a5ac-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>continue</selectedRelationships>
<source>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>93775fab-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>937caec1-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>937c7a38-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>93775fab-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>937ea3c7-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>937e7aba-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>937d9f8b-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>937f70e2-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>93775fab-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>937ab81d-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>93b35274-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>937d9f8b-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>9380a5ac-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>93c1212b-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>188.88546752929688</x>
<y>812.381591796875</y>
</bends>
<destination>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>937ab81d-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>continue</selectedRelationships>
<source>
<groupId>93758603-0157-1000-0000-000000000000</groupId>
<id>93775fab-0157-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<labels>
<id>93862697-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>322.6819797317273</x>
<y>865.5308206915452</y>
</position>
<height>724.5104370117188</height>
<label>Processing within a loop</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>470.50579833984375</width>
</labels>
<processors>
<id>9375e290-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>370.29449464309204</x>
<y>185.62176449203753</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0b</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Binary</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Start Processing</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#00fa3f</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>93761c82-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>373.1636352680921</x>
<y>399.9999993553188</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>i</key>
<value>
<name>i</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>i</key>
<value>0</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Initialize Count</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>93775fab-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>373.3599243305921</x>
<y>607.9672845115688</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>continue</key>
<value>
<name>continue</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>continue</key>
<value>${i:lt(50)}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>while: i &lt; 50</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>continue</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
<processors>
<id>937ab81d-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>-327.88117216599</x>
<y>607.9063714256313</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>i</key>
<value>
<name>i</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>i</key>
<value>${i:plus(1)}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>i++</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>937c7a38-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>1008.8491211079358</x>
<y>602.878478359225</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Terminate</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#eb0e03</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>937d9f8b-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>373.57315064895147</x>
<y>1143.962462734225</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Attributes List</key>
<value>
<name>Attributes List</name>
</value>
</entry>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Include Core Attributes</key>
<value>
<name>Include Core Attributes</name>
</value>
</entry>
<entry>
<key>Null Value</key>
<value>
<name>Null Value</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Attributes List</key>
</entry>
<entry>
<key>Destination</key>
<value>flowfile-content</value>
</entry>
<entry>
<key>Include Core Attributes</key>
<value>true</value>
</entry>
<entry>
<key>Null Value</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>AttributesToJSON</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.AttributesToJSON</type>
</processors>
<processors>
<id>937e7aba-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>373.4956360005139</x>
<y>1410.9888909568813</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Directory</key>
<value>
<name>Directory</name>
</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>
<name>Conflict Resolution Strategy</name>
</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>
<name>Create Missing Directories</name>
</value>
</entry>
<entry>
<key>Maximum File Count</key>
<value>
<name>Maximum File Count</name>
</value>
</entry>
<entry>
<key>Last Modified Time</key>
<value>
<name>Last Modified Time</name>
</value>
</entry>
<entry>
<key>Permissions</key>
<value>
<name>Permissions</name>
</value>
</entry>
<entry>
<key>Owner</key>
<value>
<name>Owner</name>
</value>
</entry>
<entry>
<key>Group</key>
<value>
<name>Group</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Directory</key>
<value>/tmp/output</value>
</entry>
<entry>
<key>Conflict Resolution Strategy</key>
<value>replace</value>
</entry>
<entry>
<key>Create Missing Directories</key>
<value>true</value>
</entry>
<entry>
<key>Maximum File Count</key>
</entry>
<entry>
<key>Last Modified Time</key>
</entry>
<entry>
<key>Permissions</key>
</entry>
<entry>
<key>Owner</key>
</entry>
<entry>
<key>Group</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutFile</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#ff0d00</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.PutFile</type>
</processors>
<processors>
<id>9380a5ac-0157-1000-0000-000000000000</id>
<parentGroupId>93758603-0157-1000-0000-000000000000</parentGroupId>
<position>
<x>374.7201214696913</x>
<y>905.4692405157157</y>
</position>
<config>
<annotationData>&lt;criteria&gt;
&lt;flowFilePolicy&gt;USE_ORIGINAL&lt;/flowFilePolicy&gt;
&lt;/criteria&gt;</annotationData>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>filename</key>
<value>
<name>filename</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>filename</key>
<value>${i}.json</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Update Filename</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
</contents>
<name>loop-sample</name>
</processGroups>
</snippet>
<timestamp>10/05/2016 16:53:17 JST</timestamp>
</template>
@panchicore
Copy link

useful, thanks!

@dachosys
Copy link

dachosys commented Nov 1, 2019

You also could replace one or both UpdateAttribute processors for one ExecuteScript processor and increase attribute variable counter:

//groovy script
flowFile = session.get()
if(!flowFile) return
flowFile.IMSI_GROUP=flowFile.IMSI_GROUP.isInteger() ? flowFile.IMSI_GROUP.toInteger()+1 : flowFile.IMSI_GROUP+1
session.transfer(flowFile, REL_SUCCESS)

@Ininoro
Copy link

Ininoro commented Mar 20, 2024

я одна думала, что здесь будет что-то про когнитивные функции?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment