Skip to content

Instantly share code, notes, and snippets.

@Imisrael

Imisrael/blog.md Secret

Created February 18, 2021 19:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Imisrael/c3e3e177e06a70cea1ade8db3623613f to your computer and use it in GitHub Desktop.
Save Imisrael/c3e3e177e06a70cea1ade8db3623613f to your computer and use it in GitHub Desktop.

Introduction

In this tutorial, we will import comma separated (CSV) data into GridDB using the popular ETL tool, Apache Nifi. Nifi is a enterprise-ready data plumbing platform that is highly configurable and extensible.

ETL is an acronym that stands for Extract, Transform, and Load, which simply means copying a data source from one format to another where the data is contextualized differently between the source and destination. While there are many advanced enterprise ETL tools, many developers have used basic text processing tools like awk, sed, and grep to build rudimentary ETL pipelines.

In this tutorial, we will cover three different transformations that most GridDB users would like to implement:

  • Partition data into multiple tables based on a record column value.
  • Convert.
  • Combine multiple columns into one.
  • Convert a datetime string to timestamp value that can be inserted into GridDB.

Setup Nifi/GridDB

Nifi's installation is simple; download and untar the tarball before starting the server:

https://gist.github.com/79470f2b01f34ec3f7fe3b2c0f580f46

If you want to access the Nifi server from a remote computer, edit conf/nifi.properties, changing nifi.web.http.host to your local IP address.

If you do not already have GridDB set up, follow the Getting Started manual to first install GridDB. You will also need to use the gridstore-jdbc.jar from GridDB.net's Github repository. It has implemented additional JDBC functions requiured by Nifi.

The Data Set

We're going to use the Historic NYPD Complaint Data made available Open NYC available here. The data has nearly 7 million rows across 35 columns totalling just over 2GB. We've also made available a 1000 row version for testing that you can download from the GridDB.net GitHub Repository here.

Build the Nifi Flow

The Nifi Flow defines how data is extracted, transformed, and loaded with a series of Processors and Controllers. A Processor is responsible for the task whose inputs and outputs are defined by its configured Controllers. If you're new to Nifi, the navigation can be difficult. Processors can be added by dragging the first icon on the top bar into the workspace and Controllers can be added to your Flow by Clicking the "Gear Icon" in the "Operate Panel" and then selecting the "Controller Services" tab.

The following screenshot depicts the overall flow and how the various processors are connected:

0_NifiFlow.png

The LogAttributes are generic and show the number of failed or successful records output by a processor.

The AvroRegistry Controller defines schemas and is referenced by the other controllers, in this case the schema of the CSV file we're reading and the schema of the GridDB table/container that we're writing.

100_AvroRegistry.png

Most of the NYCC data is missing one or more data fields which can cause errors, so defaults are set in the input schema as follows:

https://gist.github.com/cc4fb56e1704ff8020a462816a9a27ab

The defaults aren't required in the output schema as the fields have been populated, but two fields are added for the CMPLNT_TO/FR timestamp:

https://gist.github.com/3db4c6991912c75ee163da070efa507a

The DBCPConnectionPool controller is responsible for all database connections within our flow and it uses the GridDB JDBC interface. It is configured with the JDBC driver, JAR path, connection url, username, and password as follows:

99_DBCP.png

GetFile

1_GetFile.png

The GetFile processor reads the CSV file from disk. All that needs to be configured is the path where the CSV will be read. It should be noted that the CSV will be deleted after being read, so it is best to create a new staging directory and copy in files as required.

PartitionRecord

2_PartitionRecord.png

The first PartitionRecord processor splits the records from the flow based on a column value. In the case of the New York Crime Complaint data, we're going to split the data by precinct which is the ADDR_PCT_CD field. It works by setting the ${precinct} attribute to the value of the /ADDR_PCT_CD field. PutDatabaseRecord and PutSQL processors will use the ${precinct} attribute to determine the table name. This split allows us to put data for individual police precincts into separate tables/containers.

The RawCSVReader controller is used to read the Raw CSV from GetFile while the RawCSVRecordSetWriter re-writes the CSV for the next Processor in the chain.

101_RawCSVReader.png

102_RawCSVRecordSetWriter.png

PutSQL

4_PutSQL.png We need to create the table for a precinct before PutDatabaseRecord can write; PutSQL is the best processor for that. It reads the ${precinct} attribute and executes the following SQL statement:

https://gist.github.com/16f49b4bbf35e1bad9a74f69f7832f1d

PartitionRecord

3_PartitionRecord.png

The second PartitionRecord processor splits up the input so that each row of the CSV becomes one record. Records are still read and written by RawCSVReader and RawCSVRecordSetWriter respectively.

UpdateRecord

4_UpdateRecord.png

The first UpdateRecord processor changes the date format of the CSV from MM/dd/yyyy to yyyy-MM-dd as that is the expected format to convert into a timestamp in the next processor.

The property updates used are: https://gist.github.com/30ddec52f76617ae6214f8e4f0d549f7

UpdateRecord

5_UpdateRecord.png

The UpdateRecord processor combines Date and Time fields and then converts the record to Avro by using the AvroRecordSetWriter as the output. By converting the record from CSV to Avro we are able to convert a date string such as 2021-02-11 18:14:17 to an epoch 1613067247 that can be written to the timestamp field by the PutDatabaseRecord processor.

The property updates used are:

https://gist.github.com/8782e911144a319ea4071e1ad3e3004e

The AvroRecordSetWriter controller is configured as below:

103_AvroRecordSetWriter

PutDatabaseRecord

6_PutDatabaseRecord.png Finally the last process, PutDatabaseRecord, writes the incoming records to GridDB using the DBCPConnectionPool controller. The records are read with the AvroReader controller which is configured as shown:

104_AvroReader.png

Conclusion

Finally, to start to the ETL process, we copy the downloaded rows.csv to the input directory we configured in the GetFile processor and can monitor the load from the Nifi web interface.

https://gist.github.com/3bc9829b5ee56b441a8f89e5c045200f

This tutorial has demonstrated how to load a large dataset stored in CSV to GridDB. With the extensibility of Nifi, it is possible to Extract, Transform, and Load nearly any data set from any destination to any source without writing complex scripts. You can download the Nifi Flow template used in this tutorial here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment