Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active April 27, 2017 17:42
Show Gist options
  • Save ijokarumawak/e552885a29c53ce60012cacef62a3342 to your computer and use it in GitHub Desktop.
Save ijokarumawak/e552885a29c53ce60012cacef62a3342 to your computer and use it in GitHub Desktop.
Examples utilizing RollbackOnFailure added by NIFI-3415.

Examples utilizing RollbackOnFailure added by NIFI-3415

PutSQL

By default PutSQL transfer failed FlowFile to failure or retry:

By enabling Rollback on Failure, PutSQL can rollback the transaction, and keep the input FlowFiles in its incoming queue:

PutHiveQL

By enabling Rollback on Failure

If failure happened with the 1st input FlowFile:

If failure happened with the 2nd input FlowFile:

PutHiveStreaming

By enabling Rollback on Failure

If failure happened with an Avro record in the 1st tx:

If failure happened with an Avro record in the 2nd tx:

<?xml version="1.0" ?>
<template encoding-version="1.0">
<description></description>
<groupId>7c84501d-d10c-407c-b9f3-1d80e38fe36a</groupId>
<name>PutHiveQL</name>
<snippet>
<processGroups>
<id>4238cef0-015b-1000-0000-000000000000</id>
<parentGroupId>7c84501d-d10c-407c-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>423d0fdf-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>423cf50a-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>4238f07e-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>423d1fca-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>454.5</x>
<y>341.0</y>
</bends>
<destination>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>423cf50a-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>4238f07e-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>423d32b1-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>854.5</x>
<y>341.0</y>
</bends>
<destination>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>423cf50a-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>retry</selectedRelationships>
<source>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>4238f07e-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4245b05e-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>42445b9b-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>424092fe-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4245bbc7-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>46120bc0-015b-1000-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>42445b9b-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>43491382-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>cfa13ad4-1426-1164-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>424092fe-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>439b3125-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>46120bc0-015b-1000-0000-000000000000</id>
<type>FUNNEL</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>cfa13ad4-1426-1164-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>46125d60-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>4238f07e-015b-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>4238cef0-015b-1000-0000-000000000000</groupId>
<id>46120bc0-015b-1000-0000-000000000000</id>
<type>FUNNEL</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>4239297e-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<comments></comments>
<descriptors>
<entry>
<key>hive-db-connect-url</key>
<value>
<name>hive-db-connect-url</name>
</value>
</entry>
<entry>
<key>hive-config-resources</key>
<value>
<name>hive-config-resources</name>
</value>
</entry>
<entry>
<key>hive-db-user</key>
<value>
<name>hive-db-user</name>
</value>
</entry>
<entry>
<key>hive-db-password</key>
<value>
<name>hive-db-password</name>
</value>
</entry>
<entry>
<key>hive-max-wait-time</key>
<value>
<name>hive-max-wait-time</name>
</value>
</entry>
<entry>
<key>hive-max-total-connections</key>
<value>
<name>hive-max-total-connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
<entry>
<key>Kerberos Principal</key>
<value>
<name>Kerberos Principal</name>
</value>
</entry>
<entry>
<key>Kerberos Keytab</key>
<value>
<name>Kerberos Keytab</name>
</value>
</entry>
</descriptors>
<name>HiveConnectionPool</name>
<properties>
<entry>
<key>hive-db-connect-url</key>
<value>jdbc:hive2://1.hdp.aws.mine:10000/default</value>
</entry>
<entry>
<key>hive-config-resources</key>
</entry>
<entry>
<key>hive-db-user</key>
<value>admin</value>
</entry>
<entry>
<key>hive-db-password</key>
</entry>
<entry>
<key>hive-max-wait-time</key>
<value>500 millis</value>
</entry>
<entry>
<key>hive-max-total-connections</key>
<value>8</value>
</entry>
<entry>
<key>Validation-query</key>
</entry>
<entry>
<key>Kerberos Principal</key>
</entry>
<entry>
<key>Kerberos Keytab</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.hive.HiveConnectionPool</type>
</controllerServices>
<funnels>
<id>46120bc0-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<position>
<x>631.6707546160042</x>
<y>41.89602076857636</y>
</position>
</funnels>
<processors>
<id>cfa13ad4-1426-1164-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<position>
<x>239.34310913085938</x>
<y>-143.70184326171875</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>hiveql.args.1.value</key>
<value>
<name>hiveql.args.1.value</name>
</value>
</entry>
<entry>
<key>hiveql.args.2.value</key>
<value>
<name>hiveql.args.2.value</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>hiveql.args.1.value</key>
<value>${UUID()}</value>
</entry>
<entry>
<key>hiveql.args.2.value</key>
<value>${random():mod(10)}</value>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Set Successful Attribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>4238f07e-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<position>
<x>480.0</x>
<y>142.0</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Hive Database Connection Pooling Service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.hive.HiveDBCPService</identifiesControllerService>
<name>Hive Database Connection Pooling Service</name>
</value>
</entry>
<entry>
<key>hive-batch-size</key>
<value>
<name>hive-batch-size</name>
</value>
</entry>
<entry>
<key>hive-charset</key>
<value>
<name>hive-charset</name>
</value>
</entry>
<entry>
<key>statement-delimiter</key>
<value>
<name>statement-delimiter</name>
</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>
<name>rollback-on-failure</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Hive Database Connection Pooling Service</key>
<value>4239297e-015b-1000-0000-000000000000</value>
</entry>
<entry>
<key>hive-batch-size</key>
<value>10</value>
</entry>
<entry>
<key>hive-charset</key>
<value>UTF-8</value>
</entry>
<entry>
<key>statement-delimiter</key>
<value>;</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>true</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutHiveQL</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.hive.PutHiveQL</type>
</processors>
<processors>
<id>423cf50a-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<position>
<x>479.0</x>
<y>410.0</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>424092fe-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<position>
<x>487.318115234375</x>
<y>-382.4422912597656</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>hiveql.args.1.type</key>
<value>
<name>hiveql.args.1.type</name>
</value>
</entry>
<entry>
<key>hiveql.args.2.type</key>
<value>
<name>hiveql.args.2.type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>2</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>insert into input values (?, ?)</value>
</entry>
<entry>
<key>hiveql.args.1.type</key>
<value>12</value>
</entry>
<entry>
<key>hiveql.args.2.type</key>
<value>4</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>42445b9b-015b-1000-0000-000000000000</id>
<parentGroupId>4238cef0-015b-1000-0000-000000000000</parentGroupId>
<position>
<x>752.33740234375</x>
<y>-140.97499084472656</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>hiveql.args.1.value</key>
<value>
<name>hiveql.args.1.value</name>
</value>
</entry>
<entry>
<key>hiveql.args.2.value</key>
<value>
<name>hiveql.args.2.value</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>hiveql.args.1.value</key>
<value>${UUID()}</value>
</entry>
<entry>
<key>hiveql.args.2.value</key>
<value>NOT_INTEGER</value>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Set Illegal Attribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
</contents>
<name>NIFI-3415 PutHiveQL</name>
</processGroups>
</snippet>
<timestamp>04/07/2017 03:04:49 UTC</timestamp>
</template>
View raw

(Sorry about that, but we can’t show files that are this big right now.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment