Skip to content

Instantly share code, notes, and snippets.

@saptak
Last active September 20, 2015 20:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saptak/b8af847f46a1ab60a697 to your computer and use it in GitHub Desktop.
Save saptak/b8af847f46a1ab60a697 to your computer and use it in GitHub Desktop.

###Introduction to Apache NiFi

A very common scenario in many large organizations is to define, operationalize and manage complex dataflow between myriad distributed systems that often speak different protocols and understand different data formats. Messaging-based solutions are a popular answer these days, but they don’t address many of the fundamental challenges of enterprise dataflow.

###Data Workflow scenario Let's dive deeper into the dataflow requirement. On one end we have systems that acquire data, whether they are sensors, business, or organizations gathering data for your business. That information that is collected needs to be sent to processing systems, analytics systems like Hadoop, Storm, Spark, etc and then ulimately needs to be persisted into a backing store where business users can apply analytics on the data at rest to derive business value.

Let's consisder the scenario of IoT or Remote Sensor Delivery. As the data gets collected by remote sensors on factory floors, oil rigs or travelling vehicles, that data needs to traverse a very constrained communication pipe. Highly constrained communication pipe can mean low bandwidth, power issues, reliability, etc.

From data center and data center and even within data center. Ingest problem.

There is a need to data processing - simple data processing. It is where you know something inherently about the data and you can do something. It is different from the complex event processing in that it does not do time windowed operations , etc.

###Challenges NiFi addresses

how do systems all through the organization look? the biger picture. How the enterprise connects? ability to make immediate changes must maintain chain of custody of the data through the workflow security and compliance, record that, authorize that.

Ideal messaging system:

  • it has zero latency
  • it has perfect data durability
  • it supports unlimited consumers and producers

you give it a message and it is delivered.

but, my system needs:

  • A different format and/or schema
  • to use a different protocol
  • the highest priority information first
  • large objects (event batches) vs. small objects (streams). Hadoop does not want you to send small objects.
  • authorization to the data level (topic level too broad). every systems is not inteerested in every bit of data.
  • only interested in a subset of data on a topic
  • data needs to be enriched/sanitized before it arrives

The solution:

  • add new systems to handle the protocol differences
  • add new systems to convert the data
  • add new systems to reorder the data
  • add new systems to filter the unauthorized data
  • add new topics to represent 'stages of the flow'

Downside is increased latency, complexity and limited retention.

Who/What manages the bulk of the complexity?

  • the producers and the consumers
  • the operations team, hunting down log files

What is NiFi? NiFi is built to tackle the enterprise dataflow problem across the enterprise. Key concepts:

  • Interactive comand and control
  • Data lineage
  • Platform built for extension (Flow Based Programming)
  • high performance content and metadata repositories
╔═══════╦════════════════════════════════════════════════════╗
║OS/Host║                                                    ║
╠═══════╝                                                    ║
║       ╔════╦══════════════════════════════════════╗        ║
║       ║JVM ║          ┌────────────┐              ║        ║
║       ╠════╝          │ Web Server │              ║        ║
║       ║   ╔═══════════╩═══╦────────╩══════════╗   ║        ║
║       ║   ║Flow Controller║                   ║   ║        ║
║       ║   ╠═══════════════╝                   ║   ║        ║
║       ║   ║   ┌───────────┐   ┌───────────┐   ║   ║        ║
║       ║   ║   │Processor 1│   │Extension N│   ║   ║        ║
║       ║   ║   └───────────┘   └───────────┘   ║   ║        ║
║       ║   ╚═══════════════════════════════════╝   ║        ║
║       ║┌──────────┐   ┌──────────┐  ┌──────────┐  ║        ║
║       ╚╣FlowFile  ╠═══╣┌─────────┴╦═╣┌─────────┴╦═╝        ║
║        │Repository│   ││Content   │ ││Provenance│          ║
║        └──────────┘   └┤Repository│ └┤Repository│          ║
║                        └──────────┘  └──────────┘          ║
╚════════════════════════════════════════════════════════════╝

each NiFi node runs on a node. We support many lanuages. All interactions go through the WebServer. The flow controller execute tasks. Flowfile Repo is meta about the object in the flow. FlowFile is like the http header. Content is the body. Provenance repo is the tracability info. these are resident on each node. We do not want to be making remote procedure call to another third party messaging system, since we are modifying this data constantly, asking questions of the data constantly.

We do not want to be holding that data in memory whenevr we pull that data off the broker. these are reasons why today we deal with these systems we have heap problems. beacuse we are reading these systems off the queues into your systems heap space, you manipulate it and then write it back. This is the reason is all these cases you are encouraged to keep the messages small.

This does not force you to only deal with a small messages.

with data sources are producing more data over time,with big data evolvement and IoT, with the continuous new sources that enterprises are trying to capture, a mechanism of architecting, visualising the data flow, monitoring and watching the noise that would become signal and a direction for a decision next day, along with enterprise requirements for security like encryption, and data quality at read rather than post process that requires an extensive amount of time from resources.

Apache Nifi (Acquired recently by Hortonworks) comes along with  a web based data flow management and transformation tool, with unique features like configurable back pressure, configurable latency vs. throughput, that allows Nifi to tolerate fails in network, disks, software crashes or just human mistakes…

a full description and user guide can be found here

in this example, we will show how to build an easy twitter stream data flow that will collect tweets that mentions the word “hadoop” over time and push these tweets into json file in HDFS.

Prerequisites:

  • Hortonworks Sandbox
  • 8GB RAM memory and preferably 4 processor cores.
  • Twitter dev account with Oauth token, follow steps here to set it up if you done have one.

Installation:

once you have downloaded and started the Hortonworks Sandbox, you can proceed with ssh connectivity to the sandbox, once you are in you would need to download Apache Nifi using the following command:

cd /hadoop
wget http://apache.uberglobalmirror.com//nifi/0.2.1/nifi-0.2.1-bin.tar.gz
tar -xvzf nifi-0.2.1-bin.tar.gz

after Apache Nifi is now extracted, we need to change the http web port no. from 8080 to 8089 so it doesn’t conflict with Ambari, you can do this by editing the file under /hadoop/nifi-0.2.1/conf/nifi.properties

nifi.web.http.port=8089

Confirm installation is successful by starting Apache Nifi and logging to the webserver

_/hadoop/nifi-0.2.1/bin/nifi.sh start_

now point your browser to the IP address of the Sandbox followed by the port no. 8089, in my setup it looks like the following:

http://172.16.61.130:8089/nifi

Creating a Data Flow

In order to connect to Twitter, we would either create a whole data workflow from scratch, or use a twitter template that is already available under the templates from Apache Nifi here, from these templates we will download the pull_from_twitter_garden_hose.xml file and place it on your computer.

wget https://cwiki.apache.org/confluence/download/attachments/57904847/Pull_from_Twitter_Garden_Hose.xml?version=1&modificationDate=1433234009000&api=v2

once the template is downloaded we can go to the webserver and add the template by clicking the template button on the left side corner and browse for the downloaded template to add it.

Template Uploading

now Browse to the xml file downloaded in the previous step and upload it here.

Template Confirmationyou will see the template that you have installed as marked in red, once this step is completed, lets add the template to the workspace and start configuring the processors, you can do that by adding a template using the button on the right top corner as followingAdd TemplateChoose Template

once you add the template you will end up with something like this:

Twitter Templateyou can start discovering every processor, but in a nutshell this is what every processor is doing:

Grab Garden Hose: Connecting To twitter and downloading the tweets based on the search terms provided using the twitter stream api.

Pull Key Attributes: evaluates one or more expressions against the set values, in our case its SCREEN_NAME,TEXT,LANGAUGE,USERID and we want to make sure the value is not NULL for those otherwise it will not mean much to us, the criteria is set to on “Matched” which means only Matched criteria will be passed to the next processor.

Find only Tweets: filters tweets that has no body message, retweets..etc

copy of Tweets: this is an output port that copy these into the Apache Nifi folder under /$NIFI_HOME/content_repository

now, lets create a new processor that copies the data to HDFS, but before we do that lets create the deistination folder on Hadoop by using the following command

hadoop fs -mkdir -p /nifi/tweets

now lets add the processor by clicking on the processor button on the top right corner

Add Processor

Now, lets choose the PutHDFS Processor, you can easily search for it in the search bar on the top.

putHDFS Processor

connect the HDFS Processor to the Find Only Tweet Processor and choose “tweet” as a relationship


Screenshot 2015-09-02 10.56.28
Screenshot 2015-09-02 10.19.33
now right click on the putHDFS processor and configure, you need to choose how the processor will terminate the flow, and  since this is the last processor in the flow and it wont pass any data beyond, tick all the auto-termination boxes.

Screenshot 2015-09-02 10.59.49

and go to the properties tab and add the hdfs-site.xml and core-site.xml locations, usually they are under_ /etc/hadoop/2.3.0.0-2557/0/core-site.xml_ if you are downloading the latest 2.3 sandbox/ , also dont forget to add the folder that we have created earlier on HDFS.

Screenshot 2015-09-02 11.00.51

hopefully after this you should get a red stop button instead of a warning logo on the processor box, if not check what the error is by keeping the cursor on the warning logo.

lets configure the twitter processor and add the consumer key and the access token information, notice that the consumer secret and the token secret are encrypted and wont be visible, also make sure you change the twitter end point to “Filter Endpoint” otherwise your search terms wouldn’t be active.

Screenshot 2015-09-02 10.12.52

Screenshot 2015-09-03 08.15.40

as you can see in the search terms we have added the word “hadoop”, once you are done, verify with the warning logo disappearing from the box.

now we are ready to start the flow, just simply right-click on each box and start, you should start seeing numbers and back pressure information, after a while you can stop it and verify that json files are now stored on HDFS.

Screenshot 2015-09-02 10.30.09

nifi-output

you can always add more processors to encrypt, compress or transform the files to Avro or SEQ files for example prior to dropping them in HDFS, a complete list of the available processors can be found here

This talk will outline the broad range of capabilities needed in an end-to-end data flow strategy, and will discuss how Apache NiFi adresses some of them.

###Bio of the Speaker

Saptak Sen is a Technical Product Manager at Hortonworks. He has been working as a Product Manager, Engineer on various distributed systems like Hadoop, HDInsight, Azure, SQL Server, Windows HPC Server for last ten years.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment