Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active December 13, 2023 17:16
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save ijokarumawak/b37db141b4d04c2da124c1a6d922f81f to your computer and use it in GitHub Desktop.
Save ijokarumawak/b37db141b4d04c2da124c1a6d922f81f to your computer and use it in GitHub Desktop.
NiFi Example: Load CSV File into Table, the traditional and the new way using Record.

NiFi Example: Load CSV file into RDBMS Table using the traditional way and the new way using Record

Example Data

Ingested to the example flows by GenerateFlowFile:

ID, CITY_NAME, ZIP_CD, STATE_CD
001, CITY_A, 1111, AA
002, CITY_B, 2222, BB
003, CITY_C, 3333, CC
004, CITY_D, 4444, DD

Destination Table

create table cities (
  id varchar(4) not null,
  city_name varchar(20),
  zip_cd smallint,
  state_cd varchar(2),
  primary key (id)
);

Traditional way

Upto Apache NiFi ver 1.2.0, I'd use ConvertCSVToAvro, then Avro to JSON, finally JSON to SQL. There maybe other solutions to load a CSV file with different processors, but you need to use multiple processors together.

See the 1_screenshot.png and Load_CSV_to_Table.xml template for detail.

With Record

Since Apache NiFi ver 1.3.0, new Record concept has been introduced. With Record, you can read/write different data format such as CSV/Avro/JSON ... etc. As shown in this example, several processors were also added to process Records, e.g. PutDatabaseRecord or ConvertRecord.

With Record aware processors, you don't have to convert data format as we had to do before. You can construct simpler and more efficient data flows.

See the 2_screenshot_with_record.png and Load_CSV_to_Table_with_Record.xml template for detail.

<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>73ae6ed8-015e-1000-89f3-94907fd76646</groupId>
<name>Load CSV to Table</name>
<snippet>
<processGroups>
<id>bf62e045-cc1c-337d-0000-000000000000</id>
<parentGroupId>c72c436c-5a63-35af-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>b1440bc0-a55b-3e68-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>10924146-4f7a-3544-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>5aea32fb-0913-3374-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>cd2361dc-f6b5-30a7-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1238.639892578125</x>
<y>527.9598999023438</y>
</bends>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>996153a7-af52-3fc9-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>sql</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>5aea32fb-0913-3374-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d017284c-fcc9-3cec-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1160.8798828125</x>
<y>383.9599304199219</y>
</bends>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>5aea32fb-0913-3374-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>30e68bbe-7c4e-3801-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f0820637-94df-36b4-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>973.6798706054688</x>
<y>238.5199737548828</y>
</bends>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>30e68bbe-7c4e-3801-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>58ecbb8e-8413-328f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>41118b95-7754-328f-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>10924146-4f7a-3544-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>incompatible</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>58ecbb8e-8413-328f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4c909c34-7018-3873-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>10924146-4f7a-3544-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>30e68bbe-7c4e-3801-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>5b2ba57d-b9bc-3d73-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>836.8798828125</x>
<y>87.31999206542969</y>
</bends>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>58ecbb8e-8413-328f-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>de82a955-f361-3329-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6014f380-99f9-35b4-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>10924146-4f7a-3544-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>996153a7-af52-3fc9-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>7b2c5a54-22df-3913-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
</descriptors>
<name>DBCPConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.99.100:3306/nifi_test</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/Users/koji/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar</value>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
</entry>
<entry>
<key>Max Total Connections</key>
</entry>
<entry>
<key>Validation-query</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<processors>
<id>996153a7-af52-3fc9-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>795.1199438476563</x>
<y>628.7600561523436</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JDBC Connection Pool</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>JDBC Connection Pool</name>
</value>
</entry>
<entry>
<key>Support Fragmented Transactions</key>
<value>
<name>Support Fragmented Transactions</name>
</value>
</entry>
<entry>
<key>Transaction Timeout</key>
<value>
<name>Transaction Timeout</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Obtain Generated Keys</key>
<value>
<name>Obtain Generated Keys</name>
</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>
<name>rollback-on-failure</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JDBC Connection Pool</key>
<value>7b2c5a54-22df-3913-0000-000000000000</value>
</entry>
<entry>
<key>Support Fragmented Transactions</key>
<value>true</value>
</entry>
<entry>
<key>Transaction Timeout</key>
</entry>
<entry>
<key>Batch Size</key>
<value>100</value>
</entry>
<entry>
<key>Obtain Generated Keys</key>
<value>false</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutSQL</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutSQL</type>
</processors>
<processors>
<id>de82a955-f361-3329-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>360.0</x>
<y>27.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>ID,CITY_NAME,ZIP_CD,STATE_CD
001,CITY_A,1111,AA
002,CITY_B,2222,BB
003,CITY_C,3333,CC
004,CITY_D,4444,DD</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>10924146-4f7a-3544-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>151.44000000000005</x>
<y>602.8399999999999</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>30e68bbe-7c4e-3801-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>629.520048828125</x>
<y>317.72004150390626</y>
</position>
<bundle>
<artifact>nifi-avro-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JSON container options</key>
<value>
<name>JSON container options</name>
</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>
<name>Wrap Single Record</name>
</value>
</entry>
<entry>
<key>Avro schema</key>
<value>
<name>Avro schema</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JSON container options</key>
<value>array</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>false</value>
</entry>
<entry>
<key>Avro schema</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertAvroToJSON</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.avro.ConvertAvroToJSON</type>
</processors>
<processors>
<id>58ecbb8e-8413-328f-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>476.27996826171875</x>
<y>171.9600067138672</y>
</position>
<bundle>
<artifact>nifi-kite-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Hadoop configuration files</key>
<value>
<name>Hadoop configuration files</name>
</value>
</entry>
<entry>
<key>Record schema</key>
<value>
<name>Record schema</name>
</value>
</entry>
<entry>
<key>CSV charset</key>
<value>
<name>CSV charset</name>
</value>
</entry>
<entry>
<key>CSV delimiter</key>
<value>
<name>CSV delimiter</name>
</value>
</entry>
<entry>
<key>CSV quote character</key>
<value>
<name>CSV quote character</name>
</value>
</entry>
<entry>
<key>CSV escape character</key>
<value>
<name>CSV escape character</name>
</value>
</entry>
<entry>
<key>Use CSV header line</key>
<value>
<name>Use CSV header line</name>
</value>
</entry>
<entry>
<key>Lines to skip</key>
<value>
<name>Lines to skip</name>
</value>
</entry>
<entry>
<key>kite-compression-type</key>
<value>
<name>kite-compression-type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Hadoop configuration files</key>
</entry>
<entry>
<key>Record schema</key>
<value>{
"type": "record",
"namespace": "com.example",
"name": "City",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "CITY_NAME", "type": "string" },
{ "name": "ZIP_CD", "type": "int" },
{ "name": "STATE_CD", "type": "string" }
]
} </value>
</entry>
<entry>
<key>CSV charset</key>
<value>utf8</value>
</entry>
<entry>
<key>CSV delimiter</key>
<value>,</value>
</entry>
<entry>
<key>CSV quote character</key>
<value>"</value>
</entry>
<entry>
<key>CSV escape character</key>
<value>\</value>
</entry>
<entry>
<key>Use CSV header line</key>
<value>false</value>
</entry>
<entry>
<key>Lines to skip</key>
<value>1</value>
</entry>
<entry>
<key>kite-compression-type</key>
<value>SNAPPY</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertCSVToAvro</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>incompatible</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.kite.ConvertCSVToAvro</type>
</processors>
<processors>
<id>5aea32fb-0913-3374-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>730.3200366210938</x>
<y>468.9199682617186</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JDBC Connection Pool</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>JDBC Connection Pool</name>
</value>
</entry>
<entry>
<key>Statement Type</key>
<value>
<name>Statement Type</name>
</value>
</entry>
<entry>
<key>Table Name</key>
<value>
<name>Table Name</name>
</value>
</entry>
<entry>
<key>Catalog Name</key>
<value>
<name>Catalog Name</name>
</value>
</entry>
<entry>
<key>Schema Name</key>
<value>
<name>Schema Name</name>
</value>
</entry>
<entry>
<key>Translate Field Names</key>
<value>
<name>Translate Field Names</name>
</value>
</entry>
<entry>
<key>Unmatched Field Behavior</key>
<value>
<name>Unmatched Field Behavior</name>
</value>
</entry>
<entry>
<key>Unmatched Column Behavior</key>
<value>
<name>Unmatched Column Behavior</name>
</value>
</entry>
<entry>
<key>Update Keys</key>
<value>
<name>Update Keys</name>
</value>
</entry>
<entry>
<key>jts-quoted-identifiers</key>
<value>
<name>jts-quoted-identifiers</name>
</value>
</entry>
<entry>
<key>jts-quoted-table-identifiers</key>
<value>
<name>jts-quoted-table-identifiers</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JDBC Connection Pool</key>
<value>7b2c5a54-22df-3913-0000-000000000000</value>
</entry>
<entry>
<key>Statement Type</key>
<value>INSERT</value>
</entry>
<entry>
<key>Table Name</key>
<value>cities</value>
</entry>
<entry>
<key>Catalog Name</key>
</entry>
<entry>
<key>Schema Name</key>
</entry>
<entry>
<key>Translate Field Names</key>
<value>true</value>
</entry>
<entry>
<key>Unmatched Field Behavior</key>
<value>Ignore Unmatched Fields</value>
</entry>
<entry>
<key>Unmatched Column Behavior</key>
<value>Fail on Unmatched Columns</value>
</entry>
<entry>
<key>Update Keys</key>
</entry>
<entry>
<key>jts-quoted-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>jts-quoted-table-identifiers</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertJSONToSQL</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>sql</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.ConvertJSONToSQL</type>
</processors>
</contents>
<name>Load CSV to Table</name>
</processGroups>
</snippet>
<timestamp>09/27/2017 11:03:45 JST</timestamp>
</template>
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>73ae6ed8-015e-1000-89f3-94907fd76646</groupId>
<name>Load CSV to Table with Record</name>
<snippet>
<processGroups>
<id>f81a2949-1728-32f5-0000-000000000000</id>
<parentGroupId>c72c436c-5a63-35af-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>4688ac05-b1e3-3e8d-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>39fc92eb-59e0-3e02-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6b6877b3-a10d-3536-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>847.5253295898438</x>
<y>92.41309356689453</y>
</bends>
<destination>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source><?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>73ae6ed8-015e-1000-89f3-94907fd76646</groupId>
<name>Load CSV to Table with Record</name>
<snippet>
<processGroups>
<id>f81a2949-1728-32f5-0000-000000000000</id>
<parentGroupId>c72c436c-5a63-35af-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>4688ac05-b1e3-3e8d-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>39fc92eb-59e0-3e02-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6b6877b3-a10d-3536-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>847.5253295898438</x>
<y>92.41309356689453</y>
</bends>
<destination>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>e15c2e2b-2638-32ac-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>dfd8da63-1639-3354-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
</descriptors>
<name>DBCPConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.99.100:3306/nifi_test</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/Users/koji/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar</value>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
</entry>
<entry>
<key>Max Total Connections</key>
</entry>
<entry>
<key>Validation-query</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<controllerServices>
<id>02a8d039-7d23-371b-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Skip Header Line</key>
<value>
<name>Skip Header Line</name>
</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>
<name>ignore-csv-header</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
</descriptors>
<name>CSVReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>schema-text-property</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-text</key>
<value>{
"type": "record",
"namespace": "com.example",
"name": "City",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "CITY_NAME", "type": "string" },
{ "name": "ZIP_CD", "type": "int" },
{ "name": "STATE_CD", "type": "string" }
]
}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>CSV Format</key>
</entry>
<entry>
<key>Value Separator</key>
</entry>
<entry>
<key>Skip Header Line</key>
<value>true</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>true</value>
</entry>
<entry>
<key>Quote Character</key>
</entry>
<entry>
<key>Escape Character</key>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVReader</type>
</controllerServices>
<processors>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>536.2390607198079</x>
<y>173.88254214555786</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>put-db-record-record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>put-db-record-record-reader</name>
</value>
</entry>
<entry>
<key>put-db-record-statement-type</key>
<value>
<name>put-db-record-statement-type</name>
</value>
</entry>
<entry>
<key>put-db-record-dcbp-service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>put-db-record-dcbp-service</name>
</value>
</entry>
<entry>
<key>put-db-record-catalog-name</key>
<value>
<name>put-db-record-catalog-name</name>
</value>
</entry>
<entry>
<key>put-db-record-schema-name</key>
<value>
<name>put-db-record-schema-name</name>
</value>
</entry>
<entry>
<key>put-db-record-table-name</key>
<value>
<name>put-db-record-table-name</name>
</value>
</entry>
<entry>
<key>put-db-record-translate-field-names</key>
<value>
<name>put-db-record-translate-field-names</name>
</value>
</entry>
<entry>
<key>put-db-record-unmatched-field-behavior</key>
<value>
<name>put-db-record-unmatched-field-behavior</name>
</value>
</entry>
<entry>
<key>put-db-record-unmatched-column-behavior</key>
<value>
<name>put-db-record-unmatched-column-behavior</name>
</value>
</entry>
<entry>
<key>put-db-record-update-keys</key>
<value>
<name>put-db-record-update-keys</name>
</value>
</entry>
<entry>
<key>put-db-record-field-containing-sql</key>
<value>
<name>put-db-record-field-containing-sql</name>
</value>
</entry>
<entry>
<key>put-db-record-quoted-identifiers</key>
<value>
<name>put-db-record-quoted-identifiers</name>
</value>
</entry>
<entry>
<key>put-db-record-quoted-table-identifiers</key>
<value>
<name>put-db-record-quoted-table-identifiers</name>
</value>
</entry>
<entry>
<key>put-db-record-query-timeout</key>
<value>
<name>put-db-record-query-timeout</name>
</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>
<name>rollback-on-failure</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>put-db-record-record-reader</key>
<value>02a8d039-7d23-371b-0000-000000000000</value>
</entry>
<entry>
<key>put-db-record-statement-type</key>
<value>INSERT</value>
</entry>
<entry>
<key>put-db-record-dcbp-service</key>
<value>dfd8da63-1639-3354-0000-000000000000</value>
</entry>
<entry>
<key>put-db-record-catalog-name</key>
</entry>
<entry>
<key>put-db-record-schema-name</key>
</entry>
<entry>
<key>put-db-record-table-name</key>
<value>cities</value>
</entry>
<entry>
<key>put-db-record-translate-field-names</key>
<value>true</value>
</entry>
<entry>
<key>put-db-record-unmatched-field-behavior</key>
<value>Ignore Unmatched Fields</value>
</entry>
<entry>
<key>put-db-record-unmatched-column-behavior</key>
<value>Fail on Unmatched Columns</value>
</entry>
<entry>
<key>put-db-record-update-keys</key>
</entry>
<entry>
<key>put-db-record-field-containing-sql</key>
</entry>
<entry>
<key>put-db-record-quoted-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>put-db-record-quoted-table-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>put-db-record-query-timeout</key>
<value>0 seconds</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutDatabaseRecord</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutDatabaseRecord</type>
</processors>
<processors>
<id>e15c2e2b-2638-32ac-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>360.0</x>
<y>27.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>ID,CITY_NAME,ZIP_CD,STATE_CD
001,CITY_A,1111,AA
002,CITY_B,2222,BB
003,CITY_C,3333,CC
004,CITY_D,4444,DD</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>39fc92eb-59e0-3e02-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>352.07363647460943</x>
<y>420.44577392578117</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
</contents>
<name>Load CSV to Table with Record</name>
</processGroups>
</snippet>
<timestamp>09/27/2017 11:03:56 JST</timestamp>
</template>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>e15c2e2b-2638-32ac-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>dfd8da63-1639-3354-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
</descriptors>
<name>DBCPConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.99.100:3306/nifi_test</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/Users/koji/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar</value>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
</entry>
<entry>
<key>Max Total Connections</key>
</entry>
<entry>
<key>Validation-query</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<controllerServices>
<id>02a8d039-7d23-371b-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Skip Header Line</key>
<value>
<name>Skip Header Line</name>
</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>
<name>ignore-csv-header</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
</descriptors>
<name>CSVReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>schema-text-property</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-text</key>
<value>{
"type": "record",
"namespace": "com.example",
"name": "City",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "CITY_NAME", "type": "string" },
{ "name": "ZIP_CD", "type": "int" },
{ "name": "STATE_CD", "type": "string" }
]
}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>CSV Format</key>
</entry>
<entry>
<key>Value Separator</key>
</entry>
<entry>
<key>Skip Header Line</key>
<value>true</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>true</value>
</entry>
<entry>
<key>Quote Character</key>
</entry>
<entry>
<key>Escape Character</key>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVReader</type>
</controllerServices>
<processors>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>536.2390607198079</x>
<y>173.88254214555786</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>put-db-record-record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>put-db-record-record-reader</name>
</value>
</entry>
<entry>
<key>put-db-record-statement-type</key>
<value>
<name>put-db-record-statement-type</name>
</value>
</entry>
<entry>
<key>put-db-record-dcbp-service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>put-db-record-dcbp-service</name>
</value>
</entry>
<entry>
<key>put-db-record-catalog-name</key>
<value>
<name>put-db-record-catalog-name</name>
</value>
</entry>
<entry>
<key>put-db-record-schema-name</key>
<value>
<name>put-db-record-schema-name</name>
</value>
</entry>
<entry>
<key>put-db-record-table-name</key>
<value>
<name>put-db-record-table-name</name>
</value>
</entry>
<entry>
<key>put-db-record-translate-field-names</key>
<value>
<name>put-db-record-translate-field-names</name>
</value>
</entry>
<entry>
<key>put-db-record-unmatched-field-behavior</key>
<value>
<name>put-db-record-unmatched-field-behavior</name>
</value>
</entry>
<entry>
<key>put-db-record-unmatched-column-behavior</key>
<value>
<name>put-db-record-unmatched-column-behavior</name>
</value>
</entry>
<entry>
<key>put-db-record-update-keys</key>
<value>
<name>put-db-record-update-keys</name>
</value>
</entry>
<entry>
<key>put-db-record-field-containing-sql</key>
<value>
<name>put-db-record-field-containing-sql</name>
</value>
</entry>
<entry>
<key>put-db-record-quoted-identifiers</key>
<value>
<name>put-db-record-quoted-identifiers</name>
</value>
</entry>
<entry>
<key>put-db-record-quoted-table-identifiers</key>
<value>
<name>put-db-record-quoted-table-identifiers</name>
</value>
</entry>
<entry>
<key>put-db-record-query-timeout</key>
<value>
<name>put-db-record-query-timeout</name>
</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>
<name>rollback-on-failure</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>put-db-record-record-reader</key>
<value>02a8d039-7d23-371b-0000-000000000000</value>
</entry>
<entry>
<key>put-db-record-statement-type</key>
<value>INSERT</value>
</entry>
<entry>
<key>put-db-record-dcbp-service</key>
<value>dfd8da63-1639-3354-0000-000000000000</value>
</entry>
<entry>
<key>put-db-record-catalog-name</key>
</entry>
<entry>
<key>put-db-record-schema-name</key>
</entry>
<entry>
<key>put-db-record-table-name</key>
<value>cities</value>
</entry>
<entry>
<key>put-db-record-translate-field-names</key>
<value>true</value>
</entry>
<entry>
<key>put-db-record-unmatched-field-behavior</key>
<value>Ignore Unmatched Fields</value>
</entry>
<entry>
<key>put-db-record-unmatched-column-behavior</key>
<value>Fail on Unmatched Columns</value>
</entry>
<entry>
<key>put-db-record-update-keys</key>
</entry>
<entry>
<key>put-db-record-field-containing-sql</key>
</entry>
<entry>
<key>put-db-record-quoted-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>put-db-record-quoted-table-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>put-db-record-query-timeout</key>
<value>0 seconds</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutDatabaseRecord</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutDatabaseRecord</type>
</processors>
<processors>
<id>e15c2e2b-2638-32ac-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>360.0</x>
<y>27.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>ID,CITY_NAME,ZIP_CD,STATE_CD
001,CITY_A,1111,AA
002,CITY_B,2222,BB
003,CITY_C,3333,CC
004,CITY_D,4444,DD</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>39fc92eb-59e0-3e02-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>352.07363647460943</x>
<y>420.44577392578117</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
</contents>
<name>Load CSV to Table with Record</name>
</processGroups>
</snippet>
<timestamp>09/27/2017 11:03:56 JST</timestamp>
</template>
@lingtze1
Copy link

Is there a way to do "UPSERT" in NiFi for this kind of scenario? WDo Insert but when a key is conflict with a record in current table, do update.

@pmengaziol
Copy link

Tried to load the second 'with record' template and got ->
The specified template is not in a valid format.
nifi 1.5.0 runningon Win7

@miten4u
Copy link

miten4u commented Apr 9, 2018

@lingtze1
For UPSERT, there don't seems any direct way but can try following:-
For each record do a SQL select query using ExecuteSQL.
Use flowfile attribute executesql.row.count which gives the number of records found.
If there are no records means we need to insert.
And if count is greater than 0, its an update.
Have to play with RouteOnAttributes.

@lingtze1
Copy link

I have tried importing the "Load_CSV_to_Table_with_Record.xml" into NiFi 1.2 & 1.6 and it says "The specified template is not in a valid format."
I tried copy and create xml myself and also extract from zip both failed.

@marcushvega
Copy link

@lingtze1
I got it to import into nifi 1.4 by copying and pasting the inner template of "Load_CSV_to_Table_with_Record.xml" into a new document and then loading that document.

Lines 56 through lines 789

@demobo-com
Copy link

NiFi 1.6: it says "The specified template is not in a valid format."

@xCozmox
Copy link

xCozmox commented Jun 7, 2019

Load_CSV_to_Table_with_Record.xml can not be imported in latest nifi version.

@khadafi-hub
Copy link

@marcushvega thnkyou sir ,its work on me

@Amruth484
Copy link

How to do for inserting a particular content of json file to mysql table using Nifi? Im getting error as none of the field maps to column specified in table. Anyone tell me the processors i need to use.

@eodenyire
Copy link

The last template doesn't work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment