Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active February 15, 2022 11:16
Show Gist options
  • Save ijokarumawak/29a568f760cb24fb94ae3384037ab9e7 to your computer and use it in GitHub Desktop.
Save ijokarumawak/29a568f760cb24fb94ae3384037ab9e7 to your computer and use it in GitHub Desktop.

Sample NiFi Kafka data flow to verify Producer/Consumer flow file counts

  1. ProduceKafka -> topic A
  2. topic A -> ConsumeKafka -> ProduceKafka -> topic B
  3. topic B -> ConsumeKafka

The GenerateFlowFile processor is configured to generate one Flow File every second. Started Consumers before starting GenerateFlowFile.

kafka_2.11-0.10.1.0$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group nifi
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
nifi                           topic-A                        0          unknown         0               unknown         consumer-15_/192.168.99.1
nifi                           topic-B                        0          unknown         0               unknown         consumer-16_/192.168.99.1
$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group nifi
GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
nifi                           topic-A                        0          2300            2300            0               consumer-15_/192.168.99.1
nifi                           topic-B                        0          2300            2300            0               consumer-16_/192.168.99.1
<?xml version="1.0" ?>
<template encoding-version="1.0">
<description></description>
<groupId>8c7b0523-0159-1000-2fae-dcd20d8ccf1d</groupId>
<name>Kafka FlowFile Count Test</name>
<snippet>
<processGroups>
<id>90173dee-0159-1000-0000-000000000000</id>
<parentGroupId>8c7b0523-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>90205fa2-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>904cbd5f-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>901b92a4-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>9025c938-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>90250d1c-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>9025ba9c-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>9037491f-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>9036fa17-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>90250d1c-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>903d9dd7-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>903d9dd6-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>903d9dd5-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>904d0486-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>9020522a-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>90173dee-0159-1000-0000-000000000000</groupId>
<id>904cbd5f-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<processors>
<id>901b92a4-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>456.7124319622433</x>
<y>8.541472672997699</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>bootstrap.servers</key>
<value>
<name>bootstrap.servers</name>
</value>
</entry>
<entry>
<key>security.protocol</key>
<value>
<name>security.protocol</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
<value>
<name>sasl.kerberos.service.name</name>
</value>
</entry>
<entry>
<key>ssl.context.service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl.context.service</name>
</value>
</entry>
<entry>
<key>topic</key>
<value>
<name>topic</name>
</value>
</entry>
<entry>
<key>group.id</key>
<value>
<name>group.id</name>
</value>
</entry>
<entry>
<key>auto.offset.reset</key>
<value>
<name>auto.offset.reset</name>
</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>
<name>key-attribute-encoding</name>
</value>
</entry>
<entry>
<key>message-demarcator</key>
<value>
<name>message-demarcator</name>
</value>
</entry>
<entry>
<key>max.poll.records</key>
<value>
<name>max.poll.records</name>
</value>
</entry>
<entry>
<key>max-uncommit-offset-wait</key>
<value>
<name>max-uncommit-offset-wait</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>bootstrap.servers</key>
<value>localhost:9092</value>
</entry>
<entry>
<key>security.protocol</key>
<value>PLAINTEXT</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
</entry>
<entry>
<key>ssl.context.service</key>
</entry>
<entry>
<key>topic</key>
<value>topic-A</value>
</entry>
<entry>
<key>group.id</key>
<value>nifi</value>
</entry>
<entry>
<key>auto.offset.reset</key>
<value>latest</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>utf-8</value>
</entry>
<entry>
<key>message-demarcator</key>
</entry>
<entry>
<key>max.poll.records</key>
<value>10000</value>
</entry>
<entry>
<key>max-uncommit-offset-wait</key>
<value>1 secs</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConsumeKafka_0_10 from A</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10</type>
</processors>
<processors>
<id>9020522a-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>456.7124319622433</x>
<y>451.8423536320786</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>90250d1c-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>80.7458444547216</x>
<y>231.28485899361976</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>bootstrap.servers</key>
<value>
<name>bootstrap.servers</name>
</value>
</entry>
<entry>
<key>security.protocol</key>
<value>
<name>security.protocol</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
<value>
<name>sasl.kerberos.service.name</name>
</value>
</entry>
<entry>
<key>ssl.context.service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl.context.service</name>
</value>
</entry>
<entry>
<key>topic</key>
<value>
<name>topic</name>
</value>
</entry>
<entry>
<key>acks</key>
<value>
<name>acks</name>
</value>
</entry>
<entry>
<key>kafka-key</key>
<value>
<name>kafka-key</name>
</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>
<name>key-attribute-encoding</name>
</value>
</entry>
<entry>
<key>message-demarcator</key>
<value>
<name>message-demarcator</name>
</value>
</entry>
<entry>
<key>max.request.size</key>
<value>
<name>max.request.size</name>
</value>
</entry>
<entry>
<key>ack.wait.time</key>
<value>
<name>ack.wait.time</name>
</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>
<name>max.block.ms</name>
</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>
<name>partitioner.class</name>
</value>
</entry>
<entry>
<key>compression.type</key>
<value>
<name>compression.type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>bootstrap.servers</key>
<value>localhost:9092</value>
</entry>
<entry>
<key>security.protocol</key>
<value>PLAINTEXT</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
</entry>
<entry>
<key>ssl.context.service</key>
</entry>
<entry>
<key>topic</key>
<value>topic-A</value>
</entry>
<entry>
<key>acks</key>
<value>0</value>
</entry>
<entry>
<key>kafka-key</key>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>utf-8</value>
</entry>
<entry>
<key>message-demarcator</key>
</entry>
<entry>
<key>max.request.size</key>
<value>1 MB</value>
</entry>
<entry>
<key>ack.wait.time</key>
<value>5 secs</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>5 sec</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>org.apache.kafka.clients.producer.internals.DefaultPartitioner</value>
</entry>
<entry>
<key>compression.type</key>
<value>none</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PublishKafka_0_10 to A</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10</type>
</processors>
<processors>
<id>9025ba9c-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>80.7458444547216</x>
<y>8.541472672997699</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>10</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>1</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1s</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>9036fa17-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>80.7458444547216</x>
<y>451.8423536320786</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>903d9dd5-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>838.0612652017426</x>
<y>8.541472672997699</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>bootstrap.servers</key>
<value>
<name>bootstrap.servers</name>
</value>
</entry>
<entry>
<key>security.protocol</key>
<value>
<name>security.protocol</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
<value>
<name>sasl.kerberos.service.name</name>
</value>
</entry>
<entry>
<key>ssl.context.service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl.context.service</name>
</value>
</entry>
<entry>
<key>topic</key>
<value>
<name>topic</name>
</value>
</entry>
<entry>
<key>group.id</key>
<value>
<name>group.id</name>
</value>
</entry>
<entry>
<key>auto.offset.reset</key>
<value>
<name>auto.offset.reset</name>
</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>
<name>key-attribute-encoding</name>
</value>
</entry>
<entry>
<key>message-demarcator</key>
<value>
<name>message-demarcator</name>
</value>
</entry>
<entry>
<key>max.poll.records</key>
<value>
<name>max.poll.records</name>
</value>
</entry>
<entry>
<key>max-uncommit-offset-wait</key>
<value>
<name>max-uncommit-offset-wait</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>bootstrap.servers</key>
<value>localhost:9092</value>
</entry>
<entry>
<key>security.protocol</key>
<value>PLAINTEXT</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
</entry>
<entry>
<key>ssl.context.service</key>
</entry>
<entry>
<key>topic</key>
<value>topic-B</value>
</entry>
<entry>
<key>group.id</key>
<value>nifi</value>
</entry>
<entry>
<key>auto.offset.reset</key>
<value>latest</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>utf-8</value>
</entry>
<entry>
<key>message-demarcator</key>
</entry>
<entry>
<key>max.poll.records</key>
<value>10000</value>
</entry>
<entry>
<key>max-uncommit-offset-wait</key>
<value>1 secs</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConsumeKafka_0_10 from B</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10</type>
</processors>
<processors>
<id>903d9dd6-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>838.0612652017426</x>
<y>231.28485899361976</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>904cbd5f-0159-1000-0000-000000000000</id>
<parentGroupId>90173dee-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>456.7124319622433</x>
<y>231.28485899361976</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>bootstrap.servers</key>
<value>
<name>bootstrap.servers</name>
</value>
</entry>
<entry>
<key>security.protocol</key>
<value>
<name>security.protocol</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
<value>
<name>sasl.kerberos.service.name</name>
</value>
</entry>
<entry>
<key>ssl.context.service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl.context.service</name>
</value>
</entry>
<entry>
<key>topic</key>
<value>
<name>topic</name>
</value>
</entry>
<entry>
<key>acks</key>
<value>
<name>acks</name>
</value>
</entry>
<entry>
<key>kafka-key</key>
<value>
<name>kafka-key</name>
</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>
<name>key-attribute-encoding</name>
</value>
</entry>
<entry>
<key>message-demarcator</key>
<value>
<name>message-demarcator</name>
</value>
</entry>
<entry>
<key>max.request.size</key>
<value>
<name>max.request.size</name>
</value>
</entry>
<entry>
<key>ack.wait.time</key>
<value>
<name>ack.wait.time</name>
</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>
<name>max.block.ms</name>
</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>
<name>partitioner.class</name>
</value>
</entry>
<entry>
<key>compression.type</key>
<value>
<name>compression.type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>bootstrap.servers</key>
<value>localhost:9092</value>
</entry>
<entry>
<key>security.protocol</key>
<value>PLAINTEXT</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
</entry>
<entry>
<key>ssl.context.service</key>
</entry>
<entry>
<key>topic</key>
<value>topic-B</value>
</entry>
<entry>
<key>acks</key>
<value>0</value>
</entry>
<entry>
<key>kafka-key</key>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>utf-8</value>
</entry>
<entry>
<key>message-demarcator</key>
</entry>
<entry>
<key>max.request.size</key>
<value>1 MB</value>
</entry>
<entry>
<key>ack.wait.time</key>
<value>5 secs</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>5 sec</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>org.apache.kafka.clients.producer.internals.DefaultPartitioner</value>
</entry>
<entry>
<key>compression.type</key>
<value>none</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PublishKafka_0_10 to B</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10</type>
</processors>
</contents>
<name>Kafka FlowFile Count Test</name>
</processGroups>
</snippet>
<timestamp>01/12/2017 11:05:56 JST</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment