Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active February 15, 2022 11:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ijokarumawak/85a3d77297ea94614e9f3f2a9dabca67 to your computer and use it in GitHub Desktop.
Save ijokarumawak/85a3d77297ea94614e9f3f2a9dabca67 to your computer and use it in GitHub Desktop.

NiFi Example: Back-pressure mixed with Wait/Notify

This Gist explains how to make use of back-pressure and Wait/Notify processors together, to defer the source processor being scheduled until downstream processing finished completely.

See also NIFI-3452 for details.

<?xml version="1.0" ?>
<template encoding-version="1.0">
<description>Example flow to keep incoming connection back-pressure activated.</description>
<groupId>166b671c-015a-1000-0130-4e0e7f40cd58</groupId>
<name>Back-pressure Mixed with Wait/Notify</name>
<snippet>
<controllerServices>
<id>015a102b-b258-1144-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<comments></comments>
<descriptors>
<entry>
<key>Server Hostname</key>
<value>
<name>Server Hostname</name>
</value>
</entry>
<entry>
<key>Server Port</key>
<value>
<name>Server Port</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>
<name>Communications Timeout</name>
</value>
</entry>
</descriptors>
<name>DistributedMapCacheClientService</name>
<properties>
<entry>
<key>Server Hostname</key>
<value>localhost</value>
</entry>
<entry>
<key>Server Port</key>
<value>4557</value>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Communications Timeout</key>
<value>30 secs</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</type>
</controllerServices>
<processGroups>
<id>22aa2f94-015a-1000-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>015a1021-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>1</backPressureObjectThreshold>
<bends>
<x>1139.7100830078125</x>
<y>264.4938659667969</y>
</bends>
<destination>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a101f-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a101a-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a1025-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>1</backPressureObjectThreshold>
<bends>
<x>1844.0352783203125</x>
<y>123.11002349853516</y>
</bends>
<destination>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a1016-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a101a-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a102d-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>2139.77392578125</x>
<y>396.7980651855469</y>
</bends>
<destination>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a102c-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a1016-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a1034-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>2126.1314857786847</x>
<y>237.2635618947619</y>
</bends>
<bends>
<x>2126.1314857786847</x>
<y>287.2635618947619</y>
</bends>
<destination>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a1016-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>wait</selectedRelationships>
<source>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a1016-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>22b059de-015a-1000-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1137.1158447265625</x>
<y>411.0661315917969</y>
</bends>
<destination>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a102f-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a101f-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>22b06e4c-015a-1000-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1137.1158447265625</x>
<y>547.2615966796875</y>
</bends>
<destination>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a1028-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>22aa2f94-015a-1000-0000-000000000000</groupId>
<id>015a102f-b258-1144-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<labels>
<id>22bacf32-015a-1000-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>1268.1229191895368</x>
<y>25.827520543647317</y>
</position>
<height>450.0935974121094</height>
<label>Don't schedule next until a FlowFile completely goes thru this part.</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>380.0502624511719</width>
</labels>
<processors>
<id>015a1016-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>1671.1314857786851</x>
<y>197.2635618947619</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Release Signal Identifier</key>
<value>
<name>Release Signal Identifier</name>
</value>
</entry>
<entry>
<key>Target Signal Count</key>
<value>
<name>Target Signal Count</name>
</value>
</entry>
<entry>
<key>Signal Counter Name</key>
<value>
<name>Signal Counter Name</name>
</value>
</entry>
<entry>
<key>Expiration Duration</key>
<value>
<name>Expiration Duration</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Attribute Copy Mode</key>
<value>
<name>Attribute Copy Mode</name>
</value>
</entry>
<entry>
<key>Wait Mode</key>
<value>
<name>Wait Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>release-signal-id</key>
<value>arbitrary-id-${hostname()}</value>
</entry>
<entry>
<key>target-signal-count</key>
<value>1</value>
</entry>
<entry>
<key>signal-counter-name</key>
</entry>
<entry>
<key>expiration-duration</key>
<value>10 min</value>
</entry>
<entry>
<key>distributed-cache-service</key>
<value>015a102b-b258-1144-0000-000000000000</value>
</entry>
<entry>
<key>attribute-copy-mode</key>
<value>keeporiginal</value>
</entry>
<entry>
<key>wait-mode</key>
<value>keep</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>5 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Wait</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>expired</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>wait</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#4fad2d</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.Wait</type>
</processors>
<processors>
<id>015a101a-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>1285.6879039730643</x>
<y>55.660811024508575</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>5s</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>015a101f-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>1285.6879039730643</x>
<y>195.9664427541369</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>015a1028-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>1285.6879039730643</x>
<y>482.1165529068328</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Release Signal Identifier</key>
<value>
<name>Release Signal Identifier</name>
</value>
</entry>
<entry>
<key>Signal Counter Name</key>
<value>
<name>Signal Counter Name</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Attribute Cache Regex</key>
<value>
<name>Attribute Cache Regex</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Release Signal Identifier</key>
<value>arbitrary-id-${hostname()}</value>
</entry>
<entry>
<key>Signal Counter Name</key>
<value>default</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>015a102b-b258-1144-0000-000000000000</value>
</entry>
<entry>
<key>Attribute Cache Regex</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Notify</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#4fad2d</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.Notify</type>
</processors>
<processors>
<id>015a102c-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>1671.1314857786851</x>
<y>340.6390765806462</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>015a102f-b258-1144-0000-000000000000</id>
<parentGroupId>22aa2f94-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>1285.6879039730643</x>
<y>339.3419574400212</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Rate Control Criteria</key>
<value>
<name>Rate Control Criteria</name>
</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>
<name>Maximum Rate</name>
</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
<value>
<name>Rate Controlled Attribute</name>
</value>
</entry>
<entry>
<key>Time Duration</key>
<value>
<name>Time Duration</name>
</value>
</entry>
<entry>
<key>Grouping Attribute</key>
<value>
<name>Grouping Attribute</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Rate Control Criteria</key>
<value>flowfile count</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>1</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
</entry>
<entry>
<key>Time Duration</key>
<value>10 sec</value>
</entry>
<entry>
<key>Grouping Attribute</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ControlRate</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ControlRate</type>
</processors>
</contents>
<name>Back-pressure Mixed with Wait/Notify</name>
</processGroups>
</snippet>
<timestamp>02/09/2017 21:55:32 JST</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment