Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
NiFi Example: Load CSV File into Table, the traditional and the new way using Record.

NiFi Example: Load CSV file into RDBMS Table using the traditional way and the new way using Record

Example Data

Ingested to the example flows by GenerateFlowFile:

ID, CITY_NAME, ZIP_CD, STATE_CD
001, CITY_A, 1111, AA
002, CITY_B, 2222, BB
003, CITY_C, 3333, CC
004, CITY_D, 4444, DD

Destination Table

create table cities (
  id varchar(4) not null,
  city_name varchar(20),
  zip_cd smallint,
  state_cd varchar(2),
  primary key (id)
);

Traditional way

Upto Apache NiFi ver 1.2.0, I'd use ConvertCSVToAvro, then Avro to JSON, finally JSON to SQL. There maybe other solutions to load a CSV file with different processors, but you need to use multiple processors together.

See the 1_screenshot.png and Load_CSV_to_Table.xml template for detail.

With Record

Since Apache NiFi ver 1.3.0, new Record concept has been introduced. With Record, you can read/write different data format such as CSV/Avro/JSON ... etc. As shown in this example, several processors were also added to process Records, e.g. PutDatabaseRecord or ConvertRecord.

With Record aware processors, you don't have to convert data format as we had to do before. You can construct simpler and more efficient data flows.

See the 2_screenshot_with_record.png and Load_CSV_to_Table_with_Record.xml template for detail.

<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>73ae6ed8-015e-1000-89f3-94907fd76646</groupId>
<name>Load CSV to Table</name>
<snippet>
<processGroups>
<id>bf62e045-cc1c-337d-0000-000000000000</id>
<parentGroupId>c72c436c-5a63-35af-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>b1440bc0-a55b-3e68-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>10924146-4f7a-3544-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>5aea32fb-0913-3374-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>cd2361dc-f6b5-30a7-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1238.639892578125</x>
<y>527.9598999023438</y>
</bends>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>996153a7-af52-3fc9-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>sql</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>5aea32fb-0913-3374-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>d017284c-fcc9-3cec-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1160.8798828125</x>
<y>383.9599304199219</y>
</bends>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>5aea32fb-0913-3374-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>30e68bbe-7c4e-3801-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f0820637-94df-36b4-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>973.6798706054688</x>
<y>238.5199737548828</y>
</bends>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>30e68bbe-7c4e-3801-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>58ecbb8e-8413-328f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>41118b95-7754-328f-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>10924146-4f7a-3544-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<selectedRelationships>incompatible</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>58ecbb8e-8413-328f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4c909c34-7018-3873-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>10924146-4f7a-3544-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>30e68bbe-7c4e-3801-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>5b2ba57d-b9bc-3d73-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>836.8798828125</x>
<y>87.31999206542969</y>
</bends>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>58ecbb8e-8413-328f-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>de82a955-f361-3329-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6014f380-99f9-35b4-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>10924146-4f7a-3544-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>bf62e045-cc1c-337d-0000-000000000000</groupId>
<id>996153a7-af52-3fc9-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>7b2c5a54-22df-3913-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
</descriptors>
<name>DBCPConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.99.100:3306/nifi_test</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/Users/koji/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar</value>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
</entry>
<entry>
<key>Max Total Connections</key>
</entry>
<entry>
<key>Validation-query</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<processors>
<id>996153a7-af52-3fc9-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>795.1199438476563</x>
<y>628.7600561523436</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JDBC Connection Pool</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>JDBC Connection Pool</name>
</value>
</entry>
<entry>
<key>Support Fragmented Transactions</key>
<value>
<name>Support Fragmented Transactions</name>
</value>
</entry>
<entry>
<key>Transaction Timeout</key>
<value>
<name>Transaction Timeout</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Obtain Generated Keys</key>
<value>
<name>Obtain Generated Keys</name>
</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>
<name>rollback-on-failure</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JDBC Connection Pool</key>
<value>7b2c5a54-22df-3913-0000-000000000000</value>
</entry>
<entry>
<key>Support Fragmented Transactions</key>
<value>true</value>
</entry>
<entry>
<key>Transaction Timeout</key>
</entry>
<entry>
<key>Batch Size</key>
<value>100</value>
</entry>
<entry>
<key>Obtain Generated Keys</key>
<value>false</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutSQL</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutSQL</type>
</processors>
<processors>
<id>de82a955-f361-3329-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>360.0</x>
<y>27.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>ID,CITY_NAME,ZIP_CD,STATE_CD
001,CITY_A,1111,AA
002,CITY_B,2222,BB
003,CITY_C,3333,CC
004,CITY_D,4444,DD</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>10924146-4f7a-3544-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>151.44000000000005</x>
<y>602.8399999999999</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>30e68bbe-7c4e-3801-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>629.520048828125</x>
<y>317.72004150390626</y>
</position>
<bundle>
<artifact>nifi-avro-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JSON container options</key>
<value>
<name>JSON container options</name>
</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>
<name>Wrap Single Record</name>
</value>
</entry>
<entry>
<key>Avro schema</key>
<value>
<name>Avro schema</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JSON container options</key>
<value>array</value>
</entry>
<entry>
<key>Wrap Single Record</key>
<value>false</value>
</entry>
<entry>
<key>Avro schema</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertAvroToJSON</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.avro.ConvertAvroToJSON</type>
</processors>
<processors>
<id>58ecbb8e-8413-328f-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>476.27996826171875</x>
<y>171.9600067138672</y>
</position>
<bundle>
<artifact>nifi-kite-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Hadoop configuration files</key>
<value>
<name>Hadoop configuration files</name>
</value>
</entry>
<entry>
<key>Record schema</key>
<value>
<name>Record schema</name>
</value>
</entry>
<entry>
<key>CSV charset</key>
<value>
<name>CSV charset</name>
</value>
</entry>
<entry>
<key>CSV delimiter</key>
<value>
<name>CSV delimiter</name>
</value>
</entry>
<entry>
<key>CSV quote character</key>
<value>
<name>CSV quote character</name>
</value>
</entry>
<entry>
<key>CSV escape character</key>
<value>
<name>CSV escape character</name>
</value>
</entry>
<entry>
<key>Use CSV header line</key>
<value>
<name>Use CSV header line</name>
</value>
</entry>
<entry>
<key>Lines to skip</key>
<value>
<name>Lines to skip</name>
</value>
</entry>
<entry>
<key>kite-compression-type</key>
<value>
<name>kite-compression-type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Hadoop configuration files</key>
</entry>
<entry>
<key>Record schema</key>
<value>{
"type": "record",
"namespace": "com.example",
"name": "City",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "CITY_NAME", "type": "string" },
{ "name": "ZIP_CD", "type": "int" },
{ "name": "STATE_CD", "type": "string" }
]
} </value>
</entry>
<entry>
<key>CSV charset</key>
<value>utf8</value>
</entry>
<entry>
<key>CSV delimiter</key>
<value>,</value>
</entry>
<entry>
<key>CSV quote character</key>
<value>"</value>
</entry>
<entry>
<key>CSV escape character</key>
<value>\</value>
</entry>
<entry>
<key>Use CSV header line</key>
<value>false</value>
</entry>
<entry>
<key>Lines to skip</key>
<value>1</value>
</entry>
<entry>
<key>kite-compression-type</key>
<value>SNAPPY</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertCSVToAvro</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>incompatible</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.kite.ConvertCSVToAvro</type>
</processors>
<processors>
<id>5aea32fb-0913-3374-0000-000000000000</id>
<parentGroupId>bf62e045-cc1c-337d-0000-000000000000</parentGroupId>
<position>
<x>730.3200366210938</x>
<y>468.9199682617186</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>JDBC Connection Pool</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>JDBC Connection Pool</name>
</value>
</entry>
<entry>
<key>Statement Type</key>
<value>
<name>Statement Type</name>
</value>
</entry>
<entry>
<key>Table Name</key>
<value>
<name>Table Name</name>
</value>
</entry>
<entry>
<key>Catalog Name</key>
<value>
<name>Catalog Name</name>
</value>
</entry>
<entry>
<key>Schema Name</key>
<value>
<name>Schema Name</name>
</value>
</entry>
<entry>
<key>Translate Field Names</key>
<value>
<name>Translate Field Names</name>
</value>
</entry>
<entry>
<key>Unmatched Field Behavior</key>
<value>
<name>Unmatched Field Behavior</name>
</value>
</entry>
<entry>
<key>Unmatched Column Behavior</key>
<value>
<name>Unmatched Column Behavior</name>
</value>
</entry>
<entry>
<key>Update Keys</key>
<value>
<name>Update Keys</name>
</value>
</entry>
<entry>
<key>jts-quoted-identifiers</key>
<value>
<name>jts-quoted-identifiers</name>
</value>
</entry>
<entry>
<key>jts-quoted-table-identifiers</key>
<value>
<name>jts-quoted-table-identifiers</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>JDBC Connection Pool</key>
<value>7b2c5a54-22df-3913-0000-000000000000</value>
</entry>
<entry>
<key>Statement Type</key>
<value>INSERT</value>
</entry>
<entry>
<key>Table Name</key>
<value>cities</value>
</entry>
<entry>
<key>Catalog Name</key>
</entry>
<entry>
<key>Schema Name</key>
</entry>
<entry>
<key>Translate Field Names</key>
<value>true</value>
</entry>
<entry>
<key>Unmatched Field Behavior</key>
<value>Ignore Unmatched Fields</value>
</entry>
<entry>
<key>Unmatched Column Behavior</key>
<value>Fail on Unmatched Columns</value>
</entry>
<entry>
<key>Update Keys</key>
</entry>
<entry>
<key>jts-quoted-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>jts-quoted-table-identifiers</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertJSONToSQL</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>sql</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.ConvertJSONToSQL</type>
</processors>
</contents>
<name>Load CSV to Table</name>
</processGroups>
</snippet>
<timestamp>09/27/2017 11:03:45 JST</timestamp>
</template>
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>73ae6ed8-015e-1000-89f3-94907fd76646</groupId>
<name>Load CSV to Table with Record</name>
<snippet>
<processGroups>
<id>f81a2949-1728-32f5-0000-000000000000</id>
<parentGroupId>c72c436c-5a63-35af-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>4688ac05-b1e3-3e8d-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>39fc92eb-59e0-3e02-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6b6877b3-a10d-3536-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>847.5253295898438</x>
<y>92.41309356689453</y>
</bends>
<destination>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source><?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>73ae6ed8-015e-1000-89f3-94907fd76646</groupId>
<name>Load CSV to Table with Record</name>
<snippet>
<processGroups>
<id>f81a2949-1728-32f5-0000-000000000000</id>
<parentGroupId>c72c436c-5a63-35af-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>4688ac05-b1e3-3e8d-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>39fc92eb-59e0-3e02-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>6b6877b3-a10d-3536-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>847.5253295898438</x>
<y>92.41309356689453</y>
</bends>
<destination>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>e15c2e2b-2638-32ac-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>dfd8da63-1639-3354-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
</descriptors>
<name>DBCPConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.99.100:3306/nifi_test</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/Users/koji/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar</value>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
</entry>
<entry>
<key>Max Total Connections</key>
</entry>
<entry>
<key>Validation-query</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<controllerServices>
<id>02a8d039-7d23-371b-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Skip Header Line</key>
<value>
<name>Skip Header Line</name>
</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>
<name>ignore-csv-header</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
</descriptors>
<name>CSVReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>schema-text-property</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-text</key>
<value>{
"type": "record",
"namespace": "com.example",
"name": "City",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "CITY_NAME", "type": "string" },
{ "name": "ZIP_CD", "type": "int" },
{ "name": "STATE_CD", "type": "string" }
]
}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>CSV Format</key>
</entry>
<entry>
<key>Value Separator</key>
</entry>
<entry>
<key>Skip Header Line</key>
<value>true</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>true</value>
</entry>
<entry>
<key>Quote Character</key>
</entry>
<entry>
<key>Escape Character</key>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVReader</type>
</controllerServices>
<processors>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>536.2390607198079</x>
<y>173.88254214555786</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>put-db-record-record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>put-db-record-record-reader</name>
</value>
</entry>
<entry>
<key>put-db-record-statement-type</key>
<value>
<name>put-db-record-statement-type</name>
</value>
</entry>
<entry>
<key>put-db-record-dcbp-service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>put-db-record-dcbp-service</name>
</value>
</entry>
<entry>
<key>put-db-record-catalog-name</key>
<value>
<name>put-db-record-catalog-name</name>
</value>
</entry>
<entry>
<key>put-db-record-schema-name</key>
<value>
<name>put-db-record-schema-name</name>
</value>
</entry>
<entry>
<key>put-db-record-table-name</key>
<value>
<name>put-db-record-table-name</name>
</value>
</entry>
<entry>
<key>put-db-record-translate-field-names</key>
<value>
<name>put-db-record-translate-field-names</name>
</value>
</entry>
<entry>
<key>put-db-record-unmatched-field-behavior</key>
<value>
<name>put-db-record-unmatched-field-behavior</name>
</value>
</entry>
<entry>
<key>put-db-record-unmatched-column-behavior</key>
<value>
<name>put-db-record-unmatched-column-behavior</name>
</value>
</entry>
<entry>
<key>put-db-record-update-keys</key>
<value>
<name>put-db-record-update-keys</name>
</value>
</entry>
<entry>
<key>put-db-record-field-containing-sql</key>
<value>
<name>put-db-record-field-containing-sql</name>
</value>
</entry>
<entry>
<key>put-db-record-quoted-identifiers</key>
<value>
<name>put-db-record-quoted-identifiers</name>
</value>
</entry>
<entry>
<key>put-db-record-quoted-table-identifiers</key>
<value>
<name>put-db-record-quoted-table-identifiers</name>
</value>
</entry>
<entry>
<key>put-db-record-query-timeout</key>
<value>
<name>put-db-record-query-timeout</name>
</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>
<name>rollback-on-failure</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>put-db-record-record-reader</key>
<value>02a8d039-7d23-371b-0000-000000000000</value>
</entry>
<entry>
<key>put-db-record-statement-type</key>
<value>INSERT</value>
</entry>
<entry>
<key>put-db-record-dcbp-service</key>
<value>dfd8da63-1639-3354-0000-000000000000</value>
</entry>
<entry>
<key>put-db-record-catalog-name</key>
</entry>
<entry>
<key>put-db-record-schema-name</key>
</entry>
<entry>
<key>put-db-record-table-name</key>
<value>cities</value>
</entry>
<entry>
<key>put-db-record-translate-field-names</key>
<value>true</value>
</entry>
<entry>
<key>put-db-record-unmatched-field-behavior</key>
<value>Ignore Unmatched Fields</value>
</entry>
<entry>
<key>put-db-record-unmatched-column-behavior</key>
<value>Fail on Unmatched Columns</value>
</entry>
<entry>
<key>put-db-record-update-keys</key>
</entry>
<entry>
<key>put-db-record-field-containing-sql</key>
</entry>
<entry>
<key>put-db-record-quoted-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>put-db-record-quoted-table-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>put-db-record-query-timeout</key>
<value>0 seconds</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutDatabaseRecord</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutDatabaseRecord</type>
</processors>
<processors>
<id>e15c2e2b-2638-32ac-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>360.0</x>
<y>27.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>ID,CITY_NAME,ZIP_CD,STATE_CD
001,CITY_A,1111,AA
002,CITY_B,2222,BB
003,CITY_C,3333,CC
004,CITY_D,4444,DD</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>39fc92eb-59e0-3e02-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>352.07363647460943</x>
<y>420.44577392578117</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
</contents>
<name>Load CSV to Table with Record</name>
</processGroups>
</snippet>
<timestamp>09/27/2017 11:03:56 JST</timestamp>
</template>
<groupId>f81a2949-1728-32f5-0000-000000000000</groupId>
<id>e15c2e2b-2638-32ac-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>dfd8da63-1639-3354-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-dbcp-service-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Database Connection URL</key>
<value>
<name>Database Connection URL</name>
</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>
<name>Database Driver Class Name</name>
</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>
<name>database-driver-locations</name>
</value>
</entry>
<entry>
<key>Database User</key>
<value>
<name>Database User</name>
</value>
</entry>
<entry>
<key>Password</key>
<value>
<name>Password</name>
</value>
</entry>
<entry>
<key>Max Wait Time</key>
<value>
<name>Max Wait Time</name>
</value>
</entry>
<entry>
<key>Max Total Connections</key>
<value>
<name>Max Total Connections</name>
</value>
</entry>
<entry>
<key>Validation-query</key>
<value>
<name>Validation-query</name>
</value>
</entry>
</descriptors>
<name>DBCPConnectionPool</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Database Connection URL</key>
<value>jdbc:mysql://192.168.99.100:3306/nifi_test</value>
</entry>
<entry>
<key>Database Driver Class Name</key>
<value>com.mysql.jdbc.Driver</value>
</entry>
<entry>
<key>database-driver-locations</key>
<value>/Users/koji/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar</value>
</entry>
<entry>
<key>Database User</key>
<value>root</value>
</entry>
<entry>
<key>Password</key>
</entry>
<entry>
<key>Max Wait Time</key>
</entry>
<entry>
<key>Max Total Connections</key>
</entry>
<entry>
<key>Validation-query</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.dbcp.DBCPConnectionPool</type>
</controllerServices>
<controllerServices>
<id>02a8d039-7d23-371b-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Skip Header Line</key>
<value>
<name>Skip Header Line</name>
</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>
<name>ignore-csv-header</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
</descriptors>
<name>CSVReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>schema-access-strategy</key>
<value>schema-text-property</value>
</entry>
<entry>
<key>schema-registry</key>
</entry>
<entry>
<key>schema-name</key>
</entry>
<entry>
<key>schema-text</key>
<value>{
"type": "record",
"namespace": "com.example",
"name": "City",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "CITY_NAME", "type": "string" },
{ "name": "ZIP_CD", "type": "int" },
{ "name": "STATE_CD", "type": "string" }
]
}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>CSV Format</key>
</entry>
<entry>
<key>Value Separator</key>
</entry>
<entry>
<key>Skip Header Line</key>
<value>true</value>
</entry>
<entry>
<key>ignore-csv-header</key>
<value>true</value>
</entry>
<entry>
<key>Quote Character</key>
</entry>
<entry>
<key>Escape Character</key>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVReader</type>
</controllerServices>
<processors>
<id>ce225866-ef86-3a36-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>536.2390607198079</x>
<y>173.88254214555786</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>put-db-record-record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>put-db-record-record-reader</name>
</value>
</entry>
<entry>
<key>put-db-record-statement-type</key>
<value>
<name>put-db-record-statement-type</name>
</value>
</entry>
<entry>
<key>put-db-record-dcbp-service</key>
<value>
<identifiesControllerService>org.apache.nifi.dbcp.DBCPService</identifiesControllerService>
<name>put-db-record-dcbp-service</name>
</value>
</entry>
<entry>
<key>put-db-record-catalog-name</key>
<value>
<name>put-db-record-catalog-name</name>
</value>
</entry>
<entry>
<key>put-db-record-schema-name</key>
<value>
<name>put-db-record-schema-name</name>
</value>
</entry>
<entry>
<key>put-db-record-table-name</key>
<value>
<name>put-db-record-table-name</name>
</value>
</entry>
<entry>
<key>put-db-record-translate-field-names</key>
<value>
<name>put-db-record-translate-field-names</name>
</value>
</entry>
<entry>
<key>put-db-record-unmatched-field-behavior</key>
<value>
<name>put-db-record-unmatched-field-behavior</name>
</value>
</entry>
<entry>
<key>put-db-record-unmatched-column-behavior</key>
<value>
<name>put-db-record-unmatched-column-behavior</name>
</value>
</entry>
<entry>
<key>put-db-record-update-keys</key>
<value>
<name>put-db-record-update-keys</name>
</value>
</entry>
<entry>
<key>put-db-record-field-containing-sql</key>
<value>
<name>put-db-record-field-containing-sql</name>
</value>
</entry>
<entry>
<key>put-db-record-quoted-identifiers</key>
<value>
<name>put-db-record-quoted-identifiers</name>
</value>
</entry>
<entry>
<key>put-db-record-quoted-table-identifiers</key>
<value>
<name>put-db-record-quoted-table-identifiers</name>
</value>
</entry>
<entry>
<key>put-db-record-query-timeout</key>
<value>
<name>put-db-record-query-timeout</name>
</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>
<name>rollback-on-failure</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>put-db-record-record-reader</key>
<value>02a8d039-7d23-371b-0000-000000000000</value>
</entry>
<entry>
<key>put-db-record-statement-type</key>
<value>INSERT</value>
</entry>
<entry>
<key>put-db-record-dcbp-service</key>
<value>dfd8da63-1639-3354-0000-000000000000</value>
</entry>
<entry>
<key>put-db-record-catalog-name</key>
</entry>
<entry>
<key>put-db-record-schema-name</key>
</entry>
<entry>
<key>put-db-record-table-name</key>
<value>cities</value>
</entry>
<entry>
<key>put-db-record-translate-field-names</key>
<value>true</value>
</entry>
<entry>
<key>put-db-record-unmatched-field-behavior</key>
<value>Ignore Unmatched Fields</value>
</entry>
<entry>
<key>put-db-record-unmatched-column-behavior</key>
<value>Fail on Unmatched Columns</value>
</entry>
<entry>
<key>put-db-record-update-keys</key>
</entry>
<entry>
<key>put-db-record-field-containing-sql</key>
</entry>
<entry>
<key>put-db-record-quoted-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>put-db-record-quoted-table-identifiers</key>
<value>false</value>
</entry>
<entry>
<key>put-db-record-query-timeout</key>
<value>0 seconds</value>
</entry>
<entry>
<key>rollback-on-failure</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutDatabaseRecord</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>retry</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.PutDatabaseRecord</type>
</processors>
<processors>
<id>e15c2e2b-2638-32ac-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>360.0</x>
<y>27.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>ID,CITY_NAME,ZIP_CD,STATE_CD
001,CITY_A,1111,AA
002,CITY_B,2222,BB
003,CITY_C,3333,CC
004,CITY_D,4444,DD</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>39fc92eb-59e0-3e02-0000-000000000000</id>
<parentGroupId>f81a2949-1728-32f5-0000-000000000000</parentGroupId>
<position>
<x>352.07363647460943</x>
<y>420.44577392578117</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<state>STOPPED</state>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
</contents>
<name>Load CSV to Table with Record</name>
</processGroups>
</snippet>
<timestamp>09/27/2017 11:03:56 JST</timestamp>
</template>
@lingtze1

This comment has been minimized.

Copy link

@lingtze1 lingtze1 commented Dec 20, 2017

Is there a way to do "UPSERT" in NiFi for this kind of scenario? WDo Insert but when a key is conflict with a record in current table, do update.

@pmengaziol

This comment has been minimized.

Copy link

@pmengaziol pmengaziol commented Mar 28, 2018

Tried to load the second 'with record' template and got ->
The specified template is not in a valid format.
nifi 1.5.0 runningon Win7

@miten4u

This comment has been minimized.

Copy link

@miten4u miten4u commented Apr 9, 2018

@lingtze1
For UPSERT, there don't seems any direct way but can try following:-
For each record do a SQL select query using ExecuteSQL.
Use flowfile attribute executesql.row.count which gives the number of records found.
If there are no records means we need to insert.
And if count is greater than 0, its an update.
Have to play with RouteOnAttributes.

@lingtze1

This comment has been minimized.

Copy link

@lingtze1 lingtze1 commented Apr 17, 2018

I have tried importing the "Load_CSV_to_Table_with_Record.xml" into NiFi 1.2 & 1.6 and it says "The specified template is not in a valid format."
I tried copy and create xml myself and also extract from zip both failed.

@marcushvega

This comment has been minimized.

Copy link

@marcushvega marcushvega commented Apr 20, 2018

@lingtze1
I got it to import into nifi 1.4 by copying and pasting the inner template of "Load_CSV_to_Table_with_Record.xml" into a new document and then loading that document.

Lines 56 through lines 789

@demobo-com

This comment has been minimized.

Copy link

@demobo-com demobo-com commented Jun 19, 2018

NiFi 1.6: it says "The specified template is not in a valid format."

@xCozmox

This comment has been minimized.

Copy link

@xCozmox xCozmox commented Jun 7, 2019

Load_CSV_to_Table_with_Record.xml can not be imported in latest nifi version.

@khadafi-hub

This comment has been minimized.

Copy link

@khadafi-hub khadafi-hub commented Jun 11, 2020

@marcushvega thnkyou sir ,its work on me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment