Skip to content

Instantly share code, notes, and snippets.

@deinspanjer
Created October 30, 2016 04:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deinspanjer/5936402bb1ab2eea48fba166791fc26b to your computer and use it in GitHub Desktop.
Save deinspanjer/5936402bb1ab2eea48fba166791fc26b to your computer and use it in GitHub Desktop.
<?xml version="1.0" ?>
<template encoding-version="1.0">
<description>A simple GenerateFlowFile -&gt; PublishKafka_0_10 works fine, but this doesn't.
I really thought I had figured out that the problem was just trying to store the Avro records so I put in the conversion to JSON, but it is still failing. The error message is:
2016-10-29 23:52:49,629 WARN [Timer-Driven Process Thread-1] o.a.n.p.kafka.pubsub.PublishKafka_0_10 PublishKafka_0_10[id=12b0a84d-0158-1000-ce29-967712bf5ab6] Timed out while waiting for acks from Kafka</description>
<groupId>12988df2-0158-1000-88e0-450d4f9737a0</groupId>
<name>ProblemWithPublishKafka10</name>
<snippet>
<connections>
<id>12b144f7-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>12b8bdd6-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<prioritizers>org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer</prioritizers>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>12ae9604-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>12bccafc-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>13695bc8-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>12b8bdd6-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>12bd6485-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1215.3377418508821</x>
<y>103.96678552477323</y>
</bends>
<bends>
<x>1215.3377418508821</x>
<y>61.64195642321073</y>
</bends>
<destination>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>12b8bdd6-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>12b8bdd6-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>12c82d36-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>455.0</x>
<y>514.4790000915527</y>
</bends>
<bends>
<x>552.542057036429</x>
<y>560.6384812340018</y>
</bends>
<destination>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>12b0a84d-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>12b0a84d-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>1369b859-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>138082d8-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>split</selectedRelationships>
<source>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>13695bc8-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>13814c07-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>12b0a84d-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>138082d8-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>1382f78c-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>1382d09a-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>12988df2-0158-1000-0000-000000000000</groupId>
<id>138082d8-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>0b21dfff-0158-1000-0000-000000000000</id>
<parentGroupId>e1eecf1d-0157-1000-0000-000000000000</parentGroupId>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
</descriptors>
<name>DBCPConnectionPool</name>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:postgresql://localhost/pfizer_production_2016_10_21</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>org.postgresql.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/Users/dre/src/jdbc/postgresql-9.4.1211.jar</value>
</entry>
<entry>
<key>Database User</key>
<value>dre</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>500 millis</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>8</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<processors>
<id>12ae9604-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>9.71087646484375</x>
<y>0.0</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Database Connection Pooling Service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>Database Connection Pooling Service</name>
</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>
<name>db-fetch-db-type</name>
</value>
</entry>
<entry>
<key>Table Name</key>
<value>
<name>Table Name</name>
</value>
</entry>
<entry>
<key>Columns to Return</key>
<value>
<name>Columns to Return</name>
</value>
</entry>
<entry>
<key>Maximum-value Columns</key>
<value>
<name>Maximum-value Columns</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>gen-table-fetch-partition-size</key>
<value>
<name>gen-table-fetch-partition-size</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Database Connection Pooling Service</key>
<value>0b21dfff-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>Generic</value>
</entry>
<entry>
<key>Table Name</key>
<value>test_table</value>
</entry>
<entry>
<key>Columns to Return</key>
</entry>
<entry>
<key>Maximum-value Columns</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>0 seconds</value>
</entry>
<entry>
<key>gen-table-fetch-partition-size</key>
<value>10000</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateTableFetch</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateTableFetch</type>
</processors>
<processors>
<id>12b0a84d-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>474.4790000915527</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>bootstrap.servers</key>
<value>
<name>bootstrap.servers</name>
</value>
</entry>
<entry>
<key>security.protocol</key>
<value>
<name>security.protocol</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
<value>
<name>sasl.kerberos.service.name</name>
</value>
</entry>
<entry>
<key>ssl.context.service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl.context.service</name>
</value>
</entry>
<entry>
<key>topic</key>
<value>
<name>topic</name>
</value>
</entry>
<entry>
<key>acks</key>
<value>
<name>acks</name>
</value>
</entry>
<entry>
<key>kafka-key</key>
<value>
<name>kafka-key</name>
</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>
<name>key-attribute-encoding</name>
</value>
</entry>
<entry>
<key>message-demarcator</key>
<value>
<name>message-demarcator</name>
</value>
</entry>
<entry>
<key>max.request.size</key>
<value>
<name>max.request.size</name>
</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>
<name>max.block.ms</name>
</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>
<name>partitioner.class</name>
</value>
</entry>
<entry>
<key>compression.type</key>
<value>
<name>compression.type</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>bootstrap.servers</key>
<value>localhost:9092</value>
</entry>
<entry>
<key>security.protocol</key>
<value>PLAINTEXT</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
</entry>
<entry>
<key>ssl.context.service</key>
</entry>
<entry>
<key>topic</key>
<value>test.topic-a</value>
</entry>
<entry>
<key>acks</key>
<value>0</value>
</entry>
<entry>
<key>kafka-key</key>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>utf-8</value>
</entry>
<entry>
<key>message-demarcator</key>
</entry>
<entry>
<key>max.request.size</key>
<value>1 MB</value>
</entry>
<entry>
<key>max.block.ms</key>
<value>5 sec</value>
</entry>
<entry>
<key>partitioner.class</key>
<value>org.apache.kafka.clients.producer.internals.DefaultPartitioner</value>
</entry>
<entry>
<key>compression.type</key>
<value>none</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PublishKafka_0_10</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10</type>
</processors>
<processors>
<id>12b8bdd6-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>669.9247578645889</x>
<y>11.621717859695309</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Database Connection Pooling Service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>Database Connection Pooling Service</name>
</value>
</entry>
<entry>
<key>SQL select query</key>
<value>
<name>SQL select query</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Database Connection Pooling Service</key>
<value>0b21dfff-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>SQL select query</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>0 seconds</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ExecuteSQL</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ExecuteSQL</type>
</processors>
<processors>
<id>13695bc8-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>16.710931443810523</x>
<y>241.5756884260049</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Split Strategy</key>
<value>
<name>Split Strategy</name>
</value>
</entry>
<entry>
<key>Output Size</key>
<value>
<name>Output Size</name>
</value>
</entry>
<entry>
<key>Output Strategy</key>
<value>
<name>Output Strategy</name>
</value>
</entry>
<entry>
<key>Transfer Metadata</key>
<value>
<name>Transfer Metadata</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Split Strategy</key>
<value>Record</value>
</entry>
<entry>
<key>Output Size</key>
<value>1</value>
</entry>
<entry>
<key>Output Strategy</key>
<value>Datafile</value>
</entry>
<entry>
<key>Transfer Metadata</key>
<value>true</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>SplitAvro</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>split</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.avro.SplitAvro</type>
</processors>
<processors>
<id>138082d8-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>672.3854389135811</x>
<y>239.64518434582578</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JSON container options</key>
<value>
<name>JSON container options</name>
</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>
<name>Wrap Single Record</name>
</value>
</entry>
<entry>
<key>Avro schema</key>
<value>
<name>Avro schema</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JSON container options</key>
<value>none</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>false</value>
</entry>
<entry>
<key>Avro schema</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertAvroToJSON</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.avro.ConvertAvroToJSON</type>
</processors>
<processors>
<id>1382d09a-0158-1000-0000-000000000000</id>
<parentGroupId>12988df2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>760.7603886512779</x>
<y>464.42437212852764</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>true</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
</snippet>
<timestamp>10/30/2016 00:06:21 EDT</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment