Beginners guide to Apache NiFi flows

Apache NiFi is a powerful dataflow management tool for any application that requires such. With its web based graphical editor it is also a very easy to use, not just for programmers.

The basics

To understand the following parts first we have to talk about the basic concepts of NiFi. Dataflows in the system are built up from processors, these are the elements that gather, crunch and forward data. By default there are more than 180 processors available in NiFi, with the ability to write your owns. There are connectors for numerous database systems, you can route the data, send email, listen for HTTP calls, write and read files, manipulate content and so on. These processors can be put on a canvas and tied together creating a dataflow graph. Each processor has output relationships such as success or failure. You can connect these to the appropriate following processing elements to create your custom flow. NiFi processors has a few properties you can set, I won't go into details, I'll only show the things that are necessary to achieve the results.

The data pieces going trough the system are wrapped in entities called FlowFiles. A FlowFile is a very simple concept, it has the original data as content, and some attributes. Some of these attributes are set by default but processors can add remove or edit them. Attributes will be important in the upcoming demonstration because routing and other logic can be done most easily by using them.

The demo flow

Our demo case for today is an IoT application. In a smart building there are sensors on the ac and windows of an office. If the ac is on and the window is open we waste a lot of energy so we want to log these situations, create alarms, or just automatically shut down the ac. In this case we receive sensor data in 2 an Apache Kafka topics in JSON format. We are going to save the data to HDFS in CSV format and also to Apache Cassandra.

Step 1

The first step is to receive the data from Kafka. NiFi has Kafka consumer processors so it is really easy to do.
In the processors setting the bold properties are required so we set the address of some of our Kafka brokers, the security if we have it set up, also we have to specify the topics we want to read. In our case mport.1 is the topic for ac and mport.2 is for the window sensor, I have added them both.

Step 2

Since the window and ac sensor formats are different I have set up a routing based on the topic where the data came from, this way we can handle the data from the 2 sensors differently. For the routing I have used the RouteOnAttribute processor.
Fortunately the Kafka consumer set the attribute kafka.topic so I can use it now. To make it work I have selected the strategy Route to Property name, this way I can create new output relationships for this processor. I have created 2 ac and window. To tell the processor which FlowFiles should go which way we can use the NiFi expression language. For details on that look up the documentation, for now I have created an expression to check whether the kafka.topic attribute is equal to mport.1 or mport.2 .

Step 3

Okay so we have separated our flow, I'm going to show you the flow of the FlowFiles for the ac part of the system. The window side is exactly the same layout but with a slightly different data scheme.
So, I have to convert the JSON to CSV and store it on HDFS and also convert it to CQL INSERT statement to be able to use the PutCassandra processor and store it in Cassandra. These kind of a text manipulations can be done most easily with the ReplaceText processor, but to use that we have to extract all the JSON properties to attributes. For this I have used the EvaulateJSONPath processor.
By setting the Destination property to flowfile-attribute I tell NiFi to create new attributes. Here I can use JSONPath expressions to extract the values from the JSON content. After this processor is finished with a FlowFile, it will have 5 new attributes named rms_sum1, rms_sum2, rms1, rms2, and timestamp with values from the JSON content.

Step 4 / a

After the attributes are set the flow splits but the same data goes down both ways. We can easily connect the success relationship to multiple targets to achieve this. First with the HDFS archivation we want to convert the JSON to CSV, for this I have used the ReplaceText processor.
Basically I have selected the entire data content with a regular expression (?s)(^.*$) and replaced it with the data values from the attributes we have previously set ${'rms1'},${'rms_sum1'},${'rms2'},${'rms_sum2'}, separated by commas. I'm sure you have figured out that you can reach the attribute values in the expression language using ${}

Step 5 / a

HDFS is for storing big files, these small lines we have are way too small to store efficiently so I have dropped in a MergeContent processor to concatenate multiple FlowFiles together with \n as a separator.
It has many options but ours is really simple. Keep in mind that the MergeContent processor will not produce output until the minimal amount of files or the minimal group size is reached. In a streaming like processing flow this could be a problem, but since now we are writing to HDFS for archivation and analytics it is not.

Step 6 / a

NiFi has a put HDFS processor so it is really easy to store data on this filesystem. We have to set the hdfs-site and core-site xml files so NiFi will know how to connect to our cluster, other than that only the path is necessary where to put our files. This property also supports expression language so you can use some logic to determine where to store the files.
With this step the HDFS storage part is complete, let's continue with Cassandra.

Step 4 / b

We use the same principals here as we have seen previously. NiFi's PutCassandra processor requires CQL INSERT statements as data content to work, so our JSON's won't be good. We have to convert them to CQL statements. We can use the exact same method that we have used with CSV conversion.
With the ReplaceText processor we will construct the CQL statement. The exact Replacement Value in this case is the following.

Step 5 / b

The last step is very easy. I have previously created the Cassandra tables, that is necessary for this to work. Just set where the PutCassandra processor can find your Cassandra cluster, set your keyspace and it is done.
This was the flow for the ac side, for the window part we can copy this part and reuse it with the altered data schemes in EvaluateJSONPath and ReplaceText.

The final form of the flow with the ac and window branches looks like this.

Final thoughts

As you can see setting up a NiFI flow to store incoming data in multiple formats in multiple database systems is very easy. You have to go in small steps and you will be able to put together really powerful logic from these simple building blocks.

Further reading

Apache NiFi docs - https://nifi.apache.org/docs.html