Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active February 15, 2022 11:05
Show Gist options
  • Save ijokarumawak/9e1a4855934f2bb9661f88ca625bd244 to your computer and use it in GitHub Desktop.
Save ijokarumawak/9e1a4855934f2bb9661f88ca625bd244 to your computer and use it in GitHub Desktop.
NiFi example flow which limits number of incoming FlowFiles by Wait and Notify.

The tricky part is how to setup the initial state in a DistributedMapCache in order to pass the first incoming FlowFile. Currently, Wait/Notify processor doesn't provide a way to do that but you can so by FetchDistributedMapCache.

On the right top of flow, I used GenerateFlowFile and FetchDistributedMapCache to send a FlowFile to Notify, only if there is no existing target cache key. It periodically check and create one if necessary.

On the left side, I simulated incoming FlowFiles those will be queued and processed one by one.

<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>29c27336-015d-1000-3991-b2f025e19832</groupId>
<name>Using WaitNotify to limit flow</name>
<snippet>
<controllerServices>
<id>bd6674a6-3b20-3cdb-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-distributed-cache-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Server Hostname</key>
<value>
<name>Server Hostname</name>
</value>
</entry>
<entry>
<key>Server Port</key>
<value>
<name>Server Port</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>
<name>Communications Timeout</name>
</value>
</entry>
</descriptors>
<name>DistributedMapCacheClientService</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Server Hostname</key>
<value>localhost</value>
</entry>
<entry>
<key>Server Port</key>
<value>4557</value>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Communications Timeout</key>
<value>30 secs</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</type>
</controllerServices>
<processGroups>
<id>98352fd8-023e-3c2f-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>903f8dd6-6c53-3423-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>3335.579345703125</x>
<y>1428.97216796875</y>
</bends>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>26899782-7cd2-3c80-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>2be98623-184a-309f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>abfe5efb-f7fd-3bbf-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>2556.359619140625</x>
<y>1097.1597900390625</y>
</bends>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>08737b4f-305c-38a0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>30916515-e4a0-3a26-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>bb02c6d1-fcff-3202-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>26899782-7cd2-3c80-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>2be98623-184a-309f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>0c8eb801-8265-35d5-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>2349.107666015625</x>
<y>1427.5341796875</y>
</bends>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>26899782-7cd2-3c80-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>expired</selectedRelationships>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>30916515-e4a0-3a26-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>274153bd-dbac-37b7-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>26899782-7cd2-3c80-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>08737b4f-305c-38a0-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>37524d8c-19f2-32b8-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>2be98623-184a-309f-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>not-found</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>d8f1feb8-a254-39d9-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>38a46143-9e51-36fa-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>2be98623-184a-309f-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>08737b4f-305c-38a0-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>3b885f82-7b16-3a11-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>30916515-e4a0-3a26-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>35ca3968-bad4-3cc0-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6b5bd100-9448-32c5-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>d8f1feb8-a254-39d9-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>98352fd8-023e-3c2f-0000-000000000000</groupId>
<id>d1395405-5ea4-3933-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<processors>
<id>d1395405-5ea4-3933-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<position>
<x>3162.1960193502323</x>
<y>699.3454457387926</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 m</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Setup initial state</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>d8f1feb8-a254-39d9-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<position>
<x>3161.1602283346074</x>
<y>905.6399397573473</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Put Cache Value In Attribute</key>
<value>
<name>Put Cache Value In Attribute</name>
</value>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>
<name>Max Length To Put In Attribute</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>functionId</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>bd6674a6-3b20-3cdb-0000-000000000000</value>
</entry>
<entry>
<key>Put Cache Value In Attribute</key>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>256</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>FetchDistributedMapCache</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>not-found</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.FetchDistributedMapCache</type>
</processors>
<processors>
<id>08737b4f-305c-38a0-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<position>
<x>2639.880267397107</x>
<y>905.6399397573473</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Rate Control Criteria</key>
<value>
<name>Rate Control Criteria</name>
</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>
<name>Maximum Rate</name>
</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
<value>
<name>Rate Controlled Attribute</name>
</value>
</entry>
<entry>
<key>Time Duration</key>
<value>
<name>Time Duration</name>
</value>
</entry>
<entry>
<key>Grouping Attribute</key>
<value>
<name>Grouping Attribute</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Rate Control Criteria</key>
<value>flowfile count</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>1</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
</entry>
<entry>
<key>Time Duration</key>
<value>10 sec</value>
</entry>
<entry>
<key>Grouping Attribute</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Some time consuming process</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ControlRate</type>
</processors>
<processors>
<id>26899782-7cd2-3c80-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<position>
<x>2645.6357166158573</x>
<y>1356.4797358999253</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>2be98623-184a-309f-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<position>
<x>3158.279808412732</x>
<y>1151.880075255394</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>release-signal-id</key>
<value>
<name>release-signal-id</name>
</value>
</entry>
<entry>
<key>signal-counter-name</key>
<value>
<name>signal-counter-name</name>
</value>
</entry>
<entry>
<key>signal-counter-delta</key>
<value>
<name>signal-counter-delta</name>
</value>
</entry>
<entry>
<key>signal-buffer-count</key>
<value>
<name>signal-buffer-count</name>
</value>
</entry>
<entry>
<key>distributed-cache-service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient</identifiesControllerService>
<name>distributed-cache-service</name>
</value>
</entry>
<entry>
<key>attribute-cache-regex</key>
<value>
<name>attribute-cache-regex</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>release-signal-id</key>
<value>functionId</value>
</entry>
<entry>
<key>signal-counter-name</key>
<value>default</value>
</entry>
<entry>
<key>signal-counter-delta</key>
<value>1</value>
</entry>
<entry>
<key>signal-buffer-count</key>
<value>1</value>
</entry>
<entry>
<key>distributed-cache-service</key>
<value>bd6674a6-3b20-3cdb-0000-000000000000</value>
</entry>
<entry>
<key>attribute-cache-regex</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Notify</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#7ccf89</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.Notify</type>
</processors>
<processors>
<id>30916515-e4a0-3a26-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<position>
<x>2172.600023256482</x>
<y>905.6399397573473</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>release-signal-id</key>
<value>
<name>release-signal-id</name>
</value>
</entry>
<entry>
<key>target-signal-count</key>
<value>
<name>target-signal-count</name>
</value>
</entry>
<entry>
<key>signal-counter-name</key>
<value>
<name>signal-counter-name</name>
</value>
</entry>
<entry>
<key>wait-buffer-count</key>
<value>
<name>wait-buffer-count</name>
</value>
</entry>
<entry>
<key>releasable-flowfile-count</key>
<value>
<name>releasable-flowfile-count</name>
</value>
</entry>
<entry>
<key>expiration-duration</key>
<value>
<name>expiration-duration</name>
</value>
</entry>
<entry>
<key>distributed-cache-service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient</identifiesControllerService>
<name>distributed-cache-service</name>
</value>
</entry>
<entry>
<key>attribute-copy-mode</key>
<value>
<name>attribute-copy-mode</name>
</value>
</entry>
<entry>
<key>wait-mode</key>
<value>
<name>wait-mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>release-signal-id</key>
<value>functionId</value>
</entry>
<entry>
<key>target-signal-count</key>
<value>1</value>
</entry>
<entry>
<key>signal-counter-name</key>
</entry>
<entry>
<key>wait-buffer-count</key>
<value>1</value>
</entry>
<entry>
<key>releasable-flowfile-count</key>
<value>1</value>
</entry>
<entry>
<key>expiration-duration</key>
<value>10 min</value>
</entry>
<entry>
<key>distributed-cache-service</key>
<value>bd6674a6-3b20-3cdb-0000-000000000000</value>
</entry>
<entry>
<key>attribute-copy-mode</key>
<value>keeporiginal</value>
</entry>
<entry>
<key>wait-mode</key>
<value>keep</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Wait</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>expired</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>wait</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#7ccf89</value>
</entry>
</style>
<type>org.apache.nifi.processors.standard.Wait</type>
</processors>
<processors>
<id>35ca3968-bad4-3cc0-0000-000000000000</id>
<parentGroupId>98352fd8-023e-3c2f-0000-000000000000</parentGroupId>
<position>
<x>2174.032152162732</x>
<y>699.3454457387926</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>${now()}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>3 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Input</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
</contents>
<name>Using WaitNotify to limit flow</name>
</processGroups>
</snippet>
<timestamp>07/11/2017 13:12:45 JST</timestamp>
</template>
@jpierre
Copy link

jpierre commented May 24, 2020

Hello @ijokarumawak, i have a question, in the image you shows after wait processor there are 6 flowfiles queue. I think wait processor have to release just one by one flowfiles. I think there's a mistake in the way you process all files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment