Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Apache NiFi flow template for inserting data into relational databases. See https://www.batchiq.com/database-ingest-with-nifi.html for details.
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template><description>Demonstrates a simple pattern for inserting FlowFiles into a relational database using the PutSQL processor.</description><name>Database Insert</name><snippet><processGroups><id>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</id><parentGroupId>23b56e53-a310-48e7-8a15-99d81109bdd9</parentGroupId><position><x>387.0</x><y>24.0</y></position><activeRemotePortCount>0</activeRemotePortCount><comments></comments><contents><connections><id>5e7de26e-13db-4b1b-8010-8f19f6611d49</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><bends><x>1030.955965197994</x><y>203.67437167941813</y></bends><bends><x>1067.0</x><y>253.0</y></bends><destination><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>a0b3418c-096e-4138-9400-043dcc7da5ba</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>failure</selectedRelationships><source><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>a0b3418c-096e-4138-9400-043dcc7da5ba</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>b8731643-134e-470f-ae2b-fd101f8bb1ed</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>f3483a86-d840-420d-a30b-8ee4f0d443db</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>a0b3418c-096e-4138-9400-043dcc7da5ba</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>36a70a2f-8290-4708-814d-c0d8111e6a9b</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><bends><x>1026.0</x><y>400.0</y></bends><bends><x>1067.0</x><y>367.0</y></bends><destination><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>f3483a86-d840-420d-a30b-8ee4f0d443db</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>failure</selectedRelationships><source><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>f3483a86-d840-420d-a30b-8ee4f0d443db</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>14c1b77a-5bf9-41f8-a552-8ffd320ea560</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>a0b3418c-096e-4138-9400-043dcc7da5ba</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>7b7ad079-f880-4f5f-b631-4fdfa05e411f</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>b8b7628b-00fe-4794-a683-6b07d00133c7</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>7b7ad079-f880-4f5f-b631-4fdfa05e411f</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><source><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>925ef98e-2b5c-43de-bc01-ce39c536b989</id><type>INPUT_PORT</type></source><zIndex>0</zIndex></connections><connections><id>3b52bfce-ae76-4e50-aadd-b7e115fedcc8</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><bends><x>1027.0</x><y>418.0</y></bends><bends><x>1068.0</x><y>449.0</y></bends><destination><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>f3483a86-d840-420d-a30b-8ee4f0d443db</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>retry</selectedRelationships><source><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>f3483a86-d840-420d-a30b-8ee4f0d443db</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>a1181360-5d07-4507-b241-d53f3fd15ce8</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>1c9811df-3e05-4612-a08d-072659ba1dcc</id><type>OUTPUT_PORT</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</groupId><id>f3483a86-d840-420d-a30b-8ee4f0d443db</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><inputPorts><id>925ef98e-2b5c-43de-bc01-ce39c536b989</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><position><x>214.9999900266705</x><y>20.00000484752035</y></position><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Input</name><state>STOPPED</state><type>INPUT_PORT</type></inputPorts><outputPorts><id>1c9811df-3e05-4612-a08d-072659ba1dcc</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><position><x>213.9999900266705</x><y>398.00000484752036</y></position><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Output</name><state>STOPPED</state><type>OUTPUT_PORT</type><validationErrors>'Port 'Output'' is invalid because Output connection for port 'Output' is not defined.</validationErrors></outputPorts><processors><id>7b7ad079-f880-4f5f-b631-4fdfa05e411f</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><position><x>620.9999900266705</x><y>-12.999991337782376</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Delete Attributes Expression</key><value><description>Regular expression for attributes to be deleted from flowfiles.</description><displayName>Delete Attributes Expression</displayName><dynamic>false</dynamic><name>Delete Attributes Expression</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>datetime</key><value><description></description><displayName>datetime</displayName><dynamic>true</dynamic><name>datetime</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>message</key><value><description></description><displayName>message</displayName><dynamic>true</dynamic><name>message</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Delete Attributes Expression</key></entry><entry><key>datetime</key><value>${now():format(&quot;yyyy-MM-dd HH:mm:ss&quot;)}</value></entry><entry><key>message</key><value>Hello, PutSQL!</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>Extract Attributes</name><relationships><autoTerminate>false</autoTerminate><description>All FlowFiles are routed to this relationship</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.attributes.UpdateAttribute</type></processors><processors><id>a0b3418c-096e-4138-9400-043dcc7da5ba</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><position><x>621.9559651979939</x><y>178.67437167941813</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Regular Expression</key><value><defaultValue>(?s:^.*$)</defaultValue><description>The Search Value to search for in the FlowFile content. Only used for 'Literal Replace' and 'Regex Replace' matching strategies</description><displayName>Search Value</displayName><dynamic>false</dynamic><name>Regular Expression</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Replacement Value</key><value><defaultValue>$1</defaultValue><description>The value to insert using the 'Replacement Strategy'. Using &quot;Regex Replace&quot; back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value. Back References may also be referenced using the Expression Language, as '$1', '$2', etc. The single-tick marks MUST be included, as these variables are not &quot;Standard&quot; attribute names (attribute names must be quoted unless they contain only numbers, letters, and _).</description><displayName>Replacement Value</displayName><dynamic>false</dynamic><name>Replacement Value</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Character Set</key><value><defaultValue>UTF-8</defaultValue><description>The Character Set in which the file is encoded</description><displayName>Character Set</displayName><dynamic>false</dynamic><name>Character Set</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Maximum Buffer Size</key><value><defaultValue>1 MB</defaultValue><description>Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the replacement. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored if the &lt;Replacement Strategy&gt; property is set to one of: Append, Prepend, Always Replace</description><displayName>Maximum Buffer Size</displayName><dynamic>false</dynamic><name>Maximum Buffer Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Replacement Strategy</key><value><allowableValues><description>Insert the Replacement Value at the beginning of the FlowFile or the beginning of each line (depending on the Evaluation Mode). For &quot;Line-by-Line&quot; Evaluation Mode, the value will be prepended to each line. For &quot;Entire Text&quot; evaluation mode, the value will be prepended to the entire text.</description><displayName>Prepend</displayName><value>Prepend</value></allowableValues><allowableValues><description>Insert the Replacement Value at the end of the FlowFile or the end of each line (depending on the Evluation Mode). For &quot;Line-by-Line&quot; Evaluation Mode, the value will be appended to each line. For &quot;Entire Text&quot; evaluation mode, the value will be appended to the entire text.</description><displayName>Append</displayName><value>Append</value></allowableValues><allowableValues><description>Interpret the Search Value as a Regular Expression and replace all matches with the Replacement Value. The Replacement Value may reference Capturing Groups used in the Search Value by using a dollar-sign followed by the Capturing Group number, such as $1 or $2. If the Search Value is set to .* then everything is replaced without even evaluating the Regular Expression.</description><displayName>Regex Replace</displayName><value>Regex Replace</value></allowableValues><allowableValues><description>Search for all instances of the Search Value and replace the matches with the Replacement Value.</description><displayName>Literal Replace</displayName><value>Literal Replace</value></allowableValues><allowableValues><description>Always replaces the entire line or the entire contents of the FlowFile (depending on the value of the &lt;Evaluation Mode&gt; property) and does not bother searching for any value. When this strategy is chosen, the &lt;Search Value&gt; property is ignored.</description><displayName>Always Replace</displayName><value>Always Replace</value></allowableValues><defaultValue>Regex Replace</defaultValue><description>The strategy for how and what to replace within the FlowFile's text content.</description><displayName>Replacement Strategy</displayName><dynamic>false</dynamic><name>Replacement Strategy</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Evaluation Mode</key><value><allowableValues><displayName>Line-by-Line</displayName><value>Line-by-Line</value></allowableValues><allowableValues><displayName>Entire text</displayName><value>Entire text</value></allowableValues><defaultValue>Entire text</defaultValue><description>Run the 'Replacement Strategy' against each line separately (Line-by-Line) or buffer the entire file into memory (Entire Text) and run against that.</description><displayName>Evaluation Mode</displayName><dynamic>false</dynamic><name>Evaluation Mode</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>(?s:^.*$)</value></entry><entry><key>Replacement Value</key><value>INSERT INTO FlowFiles (uuid, filename, datetime, message)
VALUES ('${uuid}', '${filename}', '${datetime}', '${message}')</value></entry><entry><key>Character Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer Size</key><value>1 MB</value></entry><entry><key>Replacement Strategy</key><value>Always Replace</value></entry><entry><key>Evaluation Mode</key><value>Entire text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>Format INSERT Statement</name><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that could not be updated are routed to this relationship</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that have been successfully processed are routed to this relationship. This includes both FlowFiles that had text replaced and those that did not.</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.ReplaceText</type></processors><processors><id>f3483a86-d840-420d-a30b-8ee4f0d443db</id><parentGroupId>f478ace0-9295-4ee9-9a25-96e3b1c3e94f</parentGroupId><position><x>620.0690171481089</x><y>365.7668508104113</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>JDBC Connection Pool</key><value><description>Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. The Connection Pool is necessary in order to determine the appropriate database column types.</description><displayName>JDBC Connection Pool</displayName><dynamic>false</dynamic><identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService><name>JDBC Connection Pool</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Support Fragmented Transactions</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>true</defaultValue><description>If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.</description><displayName>Support Fragmented Transactions</displayName><dynamic>false</dynamic><name>Support Fragmented Transactions</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Transaction Timeout</key><value><description>If the &lt;Support Fragmented Transactions&gt; property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship</description><displayName>Transaction Timeout</displayName><dynamic>false</dynamic><name>Transaction Timeout</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Batch Size</key><value><defaultValue>100</defaultValue><description>The preferred number of FlowFiles to put to the database in a single transaction</description><displayName>Batch Size</displayName><dynamic>false</dynamic><name>Batch Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Obtain Generated Keys</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. This may result in slightly slower performance and is not supported by all databases.</description><displayName>Obtain Generated Keys</displayName><dynamic>false</dynamic><name>Obtain Generated Keys</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>JDBC Connection Pool</key><value>a198e934-4aba-42da-b1ea-cfabd7b03bbc</value></entry><entry><key>Support Fragmented Transactions</key><value>true</value></entry><entry><key>Transaction Timeout</key></entry><entry><key>Batch Size</key><value>100</value></entry><entry><key>Obtain Generated Keys</key><value>false</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>INSERT SQL</name><relationships><autoTerminate>false</autoTerminate><description>A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, such as an invalid query or an integrity constraint violation</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed</description><name>retry</name></relationships><relationships><autoTerminate>false</autoTerminate><description>A FlowFile is routed to this relationship after the database is successfully updated</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.PutSQL</type></processors></contents><disabledCount>0</disabledCount><inactiveRemotePortCount>0</inactiveRemotePortCount><inputPortCount>1</inputPortCount><invalidCount>2</invalidCount><name>Database Insert</name><outputPortCount>1</outputPortCount><parent><id>23b56e53-a310-48e7-8a15-99d81109bdd9</id><name>NiFi Flow</name></parent><runningCount>0</runningCount><stoppedCount>3</stoppedCount></processGroups></snippet><timestamp>04/19/2016 23:10:03 UTC</timestamp></template>
@tiagodias90

This comment has been minimized.

Copy link

tiagodias90 commented Apr 20, 2016

Congratulations for your contribution!!

what is the input message should I provide?

because I do not understand the "Format INSERT Statement". How to Turn: (s:. ^ * $) on INSERT INTO FlowFiles (uuid, filename, datetime, message) VALUES ( '$ {uuid}', '$ {filename}', '$ {datetime}', '$ {message}')

@sivam1

This comment has been minimized.

Copy link

sivam1 commented Apr 26, 2016

Good Work.
Can you provide a sample( I am new to this NiFi) that reads data from a fixed width file and insert into two different oracle tables based on the content type?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.