Skip to content

Instantly share code, notes, and snippets.

@mattyb149
Created January 2, 2017 17:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattyb149/0d8dd39a7c824fc1e42fc444e09d55a0 to your computer and use it in GitHub Desktop.
Save mattyb149/0d8dd39a7c824fc1e42fc444e09d55a0 to your computer and use it in GitHub Desktop.
NiFi Template for using GenerateTable fetch with a Remote Process Group to do parallel fetch with ExecuteSQL
<?xml version="1.0" ?>
<template encoding-version="1.0">
<description>This template provides a pattern for using GenerateTableFetch on the primary node
to generate multiple flow files, each one containing a SQL query to be executed in parallel by a cluster.
The flow files are transported using a Remote Process Group back to the same cluster, where they
can be executed in parallel by the ExecuteSQL processor. To increase parallelism, you can add more nodes
to the cluster. To increase concurrency, you can increase the number of
concurrent tasks for each ExecuteSQL instance.</description>
<groupId>03a0dc51-0159-1000-87c0-b1527877d72e</groupId>
<name>GenerateTableFetchExample</name>
<snippet>
<connections>
<id>603edcc4-0159-1000-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>03a0dc51-0159-1000-0000-000000000000</groupId>
<id>603ed148-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<source>
<groupId>03a0dc51-0159-1000-0000-000000000000</groupId>
<id>603eb749-0159-1000-0000-000000000000</id>
<type>INPUT_PORT</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>603f44ce-0159-1000-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>603e8377-0159-1000-0000-000000000000</groupId>
<id>603eb749-0159-1000-7ad5-fa9a9a069eb9</id>
<type>REMOTE_INPUT_PORT</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>03a0dc51-0159-1000-0000-000000000000</groupId>
<id>3487fb86-48fc-30f2-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>60400bb1-0159-1000-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>03a0dc51-0159-1000-0000-000000000000</groupId>
<id>603ffc30-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>03a0dc51-0159-1000-0000-000000000000</groupId>
<id>603ed148-0159-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>b00f6723-b9bb-3ab7-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
</descriptors>
<name>MySqlConnectionPool</name>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.99.100:32768/test_schema</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>file:///Users/mburgess/jdbc_drivers/mysql-connector-java-5.1.38-bin.jar</value>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>500 millis</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>8</value>
</entry>
<entry>
<key>Validation-query</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<inputPorts>
<id>603eb749-0159-1000-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>527.0</x>
<y>7.0</y>
</position>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<name>Load Balance to Multiple ExecuteSQLs</name>
<state>STOPPED</state>
<transmitting>false</transmitting>
<type>INPUT_PORT</type>
</inputPorts>
<processors>
<id>3487fb86-48fc-30f2-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>12.0</x>
<y>0.0</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Database Connection Pooling Service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>Database Connection Pooling Service</name>
</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>
<name>db-fetch-db-type</name>
</value>
</entry>
<entry>
<key>Table Name</key>
<value>
<name>Table Name</name>
</value>
</entry>
<entry>
<key>Columns to Return</key>
<value>
<name>Columns to Return</name>
</value>
</entry>
<entry>
<key>Maximum-value Columns</key>
<value>
<name>Maximum-value Columns</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>gen-table-fetch-partition-size</key>
<value>
<name>gen-table-fetch-partition-size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Database Connection Pooling Service</key>
<value>b00f6723-b9bb-3ab7-0000-000000000000</value>
</entry>
<entry>
<key>db-fetch-db-type</key>
<value>Generic</value>
</entry>
<entry>
<key>Table Name</key>
<value>users</value>
</entry>
<entry>
<key>Columns to Return</key>
</entry>
<entry>
<key>Maximum-value Columns</key>
<value>id</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>0 seconds</value>
</entry>
<entry>
<key>gen-table-fetch-partition-size</key>
<value>2</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>PRIMARY_NODE_ONLY</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateTableFetch</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateTableFetch</type>
</processors>
<processors>
<id>603ed148-0159-1000-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>478.0</x>
<y>180.0</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Database Connection Pooling Service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>Database Connection Pooling Service</name>
</value>
</entry>
<entry>
<key>SQL select query</key>
<value>
<name>SQL select query</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>dbf-normalize</key>
<value>
<name>dbf-normalize</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Database Connection Pooling Service</key>
<value>b00f6723-b9bb-3ab7-0000-000000000000</value>
</entry>
<entry>
<key>SQL select query</key>
</entry>
<entry>
<key>Max Wait Time</key>
<value>0 seconds</value>
</entry>
<entry>
<key>dbf-normalize</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ExecuteSQL</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ExecuteSQL</type>
</processors>
<processors>
<id>603ffc30-0159-1000-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>482.0</x>
<y>395.0</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JSON container options</key>
<value>
<name>JSON container options</name>
</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>
<name>Wrap Single Record</name>
</value>
</entry>
<entry>
<key>Avro schema</key>
<value>
<name>Avro schema</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JSON container options</key>
<value>array</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>false</value>
</entry>
<entry>
<key>Avro schema</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertAvroToJSON</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.avro.ConvertAvroToJSON</type>
</processors>
<remoteProcessGroups>
<id>603e8377-0159-1000-0000-000000000000</id>
<parentGroupId>03a0dc51-0159-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>285.0</y>
</position>
<communicationsTimeout>30 sec</communicationsTimeout>
<contents>
<inputPorts>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<connected>true</connected>
<exists>true</exists>
<id>603eb749-0159-1000-7ad5-fa9a9a069eb9</id>
<name>Load Balance to Multiple ExecuteSQLs</name>
<targetRunning>false</targetRunning>
<transmitting>true</transmitting>
<useCompression>false</useCompression>
</inputPorts>
</contents>
<proxyHost></proxyHost>
<proxyUser></proxyUser>
<targetUri>http://localhost:8989/nifi</targetUri>
<targetUris>http://localhost:8989/nifi</targetUris>
<transportProtocol>RAW</transportProtocol>
<yieldDuration>10 sec</yieldDuration>
</remoteProcessGroups>
</snippet>
<timestamp>01/02/2017 12:40:31 EST</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment