Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active February 15, 2022 11:16
Show Gist options
  • Save ijokarumawak/4a9189ac630cf6cf6cd2d35c19b43fd8 to your computer and use it in GitHub Desktop.
Save ijokarumawak/4a9189ac630cf6cf6cd2d35c19b43fd8 to your computer and use it in GitHub Desktop.
NiFi Example: SQS Consumer Distribution

NiFi Example: SQS Consumer Distribution

This Gist contains a NiFi flow template that utilizes NiFi backpressure mechanizm to distribute load among multiple consumers.

This pattern is useful to consume large data from AWS, downloading large files from S3 for instance.

The key point is configure GetSQS processor Batch Size to 1. It defaults to 10, so you may not be able to see expected distribution with small number of messages.

Set Back Pressure Object Threshold to 1 at the success relationship from GetSQS, so that only one flow file can be processed at a time. While there's a flow file in this relationship, GetSQS is not scheduled to run again, and other GetSQS will get message from the queue.

The example flow accumulates received contents from SQS, and adds an attribute so that we can see which consumer received which message. This is designed to run on a standalone NiFi instance, but simulates how it can work on a NiFi cluster among nodes.

After every message is processed, we can see a merged result as follows:

Received by p1: hello. Tue Feb 07 12:07:12 JST 2017
Received by p2: hello. Tue Feb 07 12:07:13 JST 2017
Received by p1: hello. Tue Feb 07 12:07:18 JST 2017
Received by p2: hello. Tue Feb 07 12:07:18 JST 2017
Received by p1: hello. Tue Feb 07 12:07:23 JST 2017
Received by p2: hello. Tue Feb 07 12:07:23 JST 2017
Received by p1: hello. Tue Feb 07 12:07:28 JST 2017
Received by p2: hello. Tue Feb 07 12:07:28 JST 2017
Received by p1: hello. Tue Feb 07 12:07:33 JST 2017
Received by p2: hello. Tue Feb 07 12:07:33 JST 2017
<?xml version="1.0" ?>
<template encoding-version="1.0">
<description>Use Back Pressure setting to distribute incoming message from Amazon SQS.</description>
<groupId>016ccfde-015a-1000-95c4-d82dda5e14e0</groupId>
<name>SQS Consumer Distribution Example</name>
<snippet>
<processGroups>
<id>166b671c-015a-1000-0000-000000000000</id>
<parentGroupId>016ccfde-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments>Use Back Pressure setting to distribute incoming message from Amazon SQS.</comments>
<contents>
<connections>
<id>015a1001-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>1</backPressureObjectThreshold>
<bends>
<x>252.50624084472656</x>
<y>197.11070251464844</y>
</bends>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1012-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1000-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a100e-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>-63.96377944946289</x>
<y>708.7372436523438</y>
</bends>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a101a-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a100d-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a1015-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>255.1434783935547</x>
<y>344.7967224121094</y>
</bends>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1007-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1012-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a1016-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a100d-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1007-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a1017-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>1</backPressureObjectThreshold>
<bends>
<x>252.50624084472656</x>
<y>259.0860900878906</y>
</bends>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1004-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>166b84b3-015a-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a1018-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a100d-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1009-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a1019-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>256.46209716796875</x>
<y>410.72796630859375</y>
</bends>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1009-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a1004-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>015a101c-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>443.7068786621094</x>
<y>869.6094970703125</y>
</bends>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>166d8594-015a-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>merged</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>015a101a-84b3-166b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>166f7637-015a-1000-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>166dc084-015a-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>166ddd7b-015a-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>166f9d3e-015a-1000-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>950.0588989257812</x>
<y>765.4381103515625</y>
</bends>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>166d8594-015a-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>166dc084-015a-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>166fb846-015a-1000-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>640.1820068359375</x>
<y>596.6541137695312</y>
</bends>
<destination>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>166d8594-015a-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>166b671c-015a-1000-0000-000000000000</groupId>
<id>166dc084-015a-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<labels>
<id>015a101f-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>765.3355316034548</x>
<y>295.52543907531197</y>
</position>
<height>162.67300415039062</height>
<label>Restart this to retest.</label>
<style>
<entry>
<key>font-size</key>
<value>12px</value>
</entry>
</style>
<width>373.2867736816406</width>
</labels>
<processors>
<id>015a1000-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>-220.4754709791963</x>
<y>63.53899642144199</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Queue URL</key>
<value>
<name>Queue URL</name>
</value>
</entry>
<entry>
<key>Auto Delete Messages</key>
<value>
<name>Auto Delete Messages</name>
</value>
</entry>
<entry>
<key>Access Key</key>
<value>
<name>Access Key</name>
</value>
</entry>
<entry>
<key>Secret Key</key>
<value>
<name>Secret Key</name>
</value>
</entry>
<entry>
<key>Credentials File</key>
<value>
<name>Credentials File</name>
</value>
</entry>
<entry>
<key>AWS Credentials Provider service</key>
<value>
<identifiesControllerService>org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService</identifiesControllerService>
<name>AWS Credentials Provider service</name>
</value>
</entry>
<entry>
<key>Region</key>
<value>
<name>Region</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>
<name>Communications Timeout</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Visibility Timeout</key>
<value>
<name>Visibility Timeout</name>
</value>
</entry>
<entry>
<key>Receive Message Wait Time</key>
<value>
<name>Receive Message Wait Time</name>
</value>
</entry>
<entry>
<key>Proxy Host</key>
<value>
<name>Proxy Host</name>
</value>
</entry>
<entry>
<key>Proxy Host Port</key>
<value>
<name>Proxy Host Port</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Queue URL</key>
<value></value>
</entry>
<entry>
<key>Auto Delete Messages</key>
<value>true</value>
</entry>
<entry>
<key>Access Key</key>
</entry>
<entry>
<key>Secret Key</key>
</entry>
<entry>
<key>Credentials File</key>
</entry>
<entry>
<key>AWS Credentials Provider service</key>
</entry>
<entry>
<key>Region</key>
<value>us-west-1</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>30 secs</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Visibility Timeout</key>
<value>15 mins</value>
</entry>
<entry>
<key>Receive Message Wait Time</key>
<value>0 sec</value>
</entry>
<entry>
<key>Proxy Host</key>
</entry>
<entry>
<key>Proxy Host Port</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GetSQS</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#ffb657</value>
</entry>
</style>
<type>org.apache.nifi.processors.aws.sqs.GetSQS</type>
</processors>
<processors>
<id>015a1004-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>388.4518227982695</x>
<y>205.64633628045078</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Rate Control Criteria</key>
<value>
<name>Rate Control Criteria</name>
</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>
<name>Maximum Rate</name>
</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
<value>
<name>Rate Controlled Attribute</name>
</value>
</entry>
<entry>
<key>Time Duration</key>
<value>
<name>Time Duration</name>
</value>
</entry>
<entry>
<key>Grouping Attribute</key>
<value>
<name>Grouping Attribute</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Rate Control Criteria</key>
<value>flowfile count</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>1</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
</entry>
<entry>
<key>Time Duration</key>
<value>5 sec</value>
</entry>
<entry>
<key>Grouping Attribute</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>1 flow file / 5 sec</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ControlRate</type>
</processors>
<processors>
<id>015a1007-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>-220.4754709791963</x>
<y>343.98695822869297</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>received.by</key>
<value>
<name>received.by</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>received.by</key>
<value>p1</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Received by p1</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>015a1009-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>388.4518227982695</x>
<y>344.22183676873203</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>received.by</key>
<value>
<name>received.by</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>received.by</key>
<value>p2</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Received by p2</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>015a100d-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>61.38933805705858</x>
<y>554.5455062023258</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>Received by ${received.by}: $1. ${now()}
</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Regex Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ReplaceText</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>015a1012-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>-220.4754709791963</x>
<y>204.43100424920078</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Rate Control Criteria</key>
<value>
<name>Rate Control Criteria</name>
</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>
<name>Maximum Rate</name>
</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
<value>
<name>Rate Controlled Attribute</name>
</value>
</entry>
<entry>
<key>Time Duration</key>
<value>
<name>Time Duration</name>
</value>
</entry>
<entry>
<key>Grouping Attribute</key>
<value>
<name>Grouping Attribute</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Rate Control Criteria</key>
<value>flowfile count</value>
</entry>
<entry>
<key>Maximum Rate</key>
<value>1</value>
</entry>
<entry>
<key>Rate Controlled Attribute</key>
</entry>
<entry>
<key>Time Duration</key>
<value>5 sec</value>
</entry>
<entry>
<key>Grouping Attribute</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>1 flow file / 5 sec</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ControlRate</type>
</processors>
<processors>
<id>015a101a-84b3-166b-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>59.52975864169724</x>
<y>698.2619842360833</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Merge Strategy</key>
<value>
<name>Merge Strategy</name>
</value>
</entry>
<entry>
<key>Merge Format</key>
<value>
<name>Merge Format</name>
</value>
</entry>
<entry>
<key>Attribute Strategy</key>
<value>
<name>Attribute Strategy</name>
</value>
</entry>
<entry>
<key>Correlation Attribute Name</key>
<value>
<name>Correlation Attribute Name</name>
</value>
</entry>
<entry>
<key>Minimum Number of Entries</key>
<value>
<name>Minimum Number of Entries</name>
</value>
</entry>
<entry>
<key>Maximum Number of Entries</key>
<value>
<name>Maximum Number of Entries</name>
</value>
</entry>
<entry>
<key>Minimum Group Size</key>
<value>
<name>Minimum Group Size</name>
</value>
</entry>
<entry>
<key>Maximum Group Size</key>
<value>
<name>Maximum Group Size</name>
</value>
</entry>
<entry>
<key>Max Bin Age</key>
<value>
<name>Max Bin Age</name>
</value>
</entry>
<entry>
<key>Maximum number of Bins</key>
<value>
<name>Maximum number of Bins</name>
</value>
</entry>
<entry>
<key>Delimiter Strategy</key>
<value>
<name>Delimiter Strategy</name>
</value>
</entry>
<entry>
<key>Header File</key>
<value>
<name>Header File</name>
</value>
</entry>
<entry>
<key>Footer File</key>
<value>
<name>Footer File</name>
</value>
</entry>
<entry>
<key>Demarcator File</key>
<value>
<name>Demarcator File</name>
</value>
</entry>
<entry>
<key>Compression Level</key>
<value>
<name>Compression Level</name>
</value>
</entry>
<entry>
<key>Keep Path</key>
<value>
<name>Keep Path</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Merge Strategy</key>
<value>Bin-Packing Algorithm</value>
</entry>
<entry>
<key>Merge Format</key>
<value>Binary Concatenation</value>
</entry>
<entry>
<key>Attribute Strategy</key>
<value>Keep Only Common Attributes</value>
</entry>
<entry>
<key>Correlation Attribute Name</key>
</entry>
<entry>
<key>Minimum Number of Entries</key>
<value>10</value>
</entry>
<entry>
<key>Maximum Number of Entries</key>
<value>1000</value>
</entry>
<entry>
<key>Minimum Group Size</key>
<value>0 B</value>
</entry>
<entry>
<key>Maximum Group Size</key>
</entry>
<entry>
<key>Max Bin Age</key>
</entry>
<entry>
<key>Maximum number of Bins</key>
<value>5</value>
</entry>
<entry>
<key>Delimiter Strategy</key>
<value>Filename</value>
</entry>
<entry>
<key>Header File</key>
</entry>
<entry>
<key>Footer File</key>
</entry>
<entry>
<key>Demarcator File</key>
</entry>
<entry>
<key>Compression Level</key>
<value>1</value>
</entry>
<entry>
<key>Keep Path</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>MergeContent</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>merged</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.MergeContent</type>
</processors>
<processors>
<id>166b84b3-015a-1000-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>388.4518227982695</x>
<y>67.49487754021641</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Queue URL</key>
<value>
<name>Queue URL</name>
</value>
</entry>
<entry>
<key>Auto Delete Messages</key>
<value>
<name>Auto Delete Messages</name>
</value>
</entry>
<entry>
<key>Access Key</key>
<value>
<name>Access Key</name>
</value>
</entry>
<entry>
<key>Secret Key</key>
<value>
<name>Secret Key</name>
</value>
</entry>
<entry>
<key>Credentials File</key>
<value>
<name>Credentials File</name>
</value>
</entry>
<entry>
<key>AWS Credentials Provider service</key>
<value>
<identifiesControllerService>org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService</identifiesControllerService>
<name>AWS Credentials Provider service</name>
</value>
</entry>
<entry>
<key>Region</key>
<value>
<name>Region</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>
<name>Communications Timeout</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Visibility Timeout</key>
<value>
<name>Visibility Timeout</name>
</value>
</entry>
<entry>
<key>Receive Message Wait Time</key>
<value>
<name>Receive Message Wait Time</name>
</value>
</entry>
<entry>
<key>Proxy Host</key>
<value>
<name>Proxy Host</name>
</value>
</entry>
<entry>
<key>Proxy Host Port</key>
<value>
<name>Proxy Host Port</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Queue URL</key>
<value></value>
</entry>
<entry>
<key>Auto Delete Messages</key>
<value>true</value>
</entry>
<entry>
<key>Access Key</key>
</entry>
<entry>
<key>Secret Key</key>
</entry>
<entry>
<key>Credentials File</key>
</entry>
<entry>
<key>AWS Credentials Provider service</key>
</entry>
<entry>
<key>Region</key>
<value>us-west-1</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>30 secs</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Visibility Timeout</key>
<value>15 mins</value>
</entry>
<entry>
<key>Receive Message Wait Time</key>
<value>0 sec</value>
</entry>
<entry>
<key>Proxy Host</key>
</entry>
<entry>
<key>Proxy Host Port</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GetSQS</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#ffb657</value>
</entry>
</style>
<type>org.apache.nifi.processors.aws.sqs.GetSQS</type>
</processors>
<processors>
<id>166d8594-015a-1000-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>464.99291471721483</x>
<y>700.4266097179508</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Terminator for DEBUG</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>166dc084-015a-1000-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>775.5407967972928</x>
<y>527.8567549816227</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Queue URL</key>
<value>
<name>Queue URL</name>
</value>
</entry>
<entry>
<key>Access Key</key>
<value>
<name>Access Key</name>
</value>
</entry>
<entry>
<key>Secret Key</key>
<value>
<name>Secret Key</name>
</value>
</entry>
<entry>
<key>Credentials File</key>
<value>
<name>Credentials File</name>
</value>
</entry>
<entry>
<key>AWS Credentials Provider service</key>
<value>
<identifiesControllerService>org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService</identifiesControllerService>
<name>AWS Credentials Provider service</name>
</value>
</entry>
<entry>
<key>Region</key>
<value>
<name>Region</name>
</value>
</entry>
<entry>
<key>Delay</key>
<value>
<name>Delay</name>
</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>
<name>Communications Timeout</name>
</value>
</entry>
<entry>
<key>Proxy Host</key>
<value>
<name>Proxy Host</name>
</value>
</entry>
<entry>
<key>Proxy Host Port</key>
<value>
<name>Proxy Host Port</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Queue URL</key>
<value></value>
</entry>
<entry>
<key>Access Key</key>
</entry>
<entry>
<key>Secret Key</key>
</entry>
<entry>
<key>Credentials File</key>
</entry>
<entry>
<key>AWS Credentials Provider service</key>
</entry>
<entry>
<key>Region</key>
<value>us-west-1</value>
</entry>
<entry>
<key>Delay</key>
<value>0 secs</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>30 secs</value>
</entry>
<entry>
<key>Proxy Host</key>
</entry>
<entry>
<key>Proxy Host Port</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutSQS</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style>
<entry>
<key>background-color</key>
<value>#ffb657</value>
</entry>
</style>
<type>org.apache.nifi.processors.aws.sqs.PutSQS</type>
</processors>
<processors>
<id>166ddd7b-015a-1000-0000-000000000000</id>
<parentGroupId>166b671c-015a-1000-0000-000000000000</parentGroupId>
<position>
<x>775.540796797293</x>
<y>320.5951430431461</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>10</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>hello</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Generate 10 flow files</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
</contents>
<name>SQS Consumer Distribution Example</name>
</processGroups>
</snippet>
<timestamp>02/07/2017 12:09:16 JST</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment