Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save ijokarumawak/60b9ab2038ef906731ebf4c0eee97176 to your computer and use it in GitHub Desktop.
Save ijokarumawak/60b9ab2038ef906731ebf4c0eee97176 to your computer and use it in GitHub Desktop.
Example Zeppelin note book and NiFi flow.

Realtime Data Visualization with Zeppelin via NiFi WebSocket

This is an example realtime data visualization method using Zeppelin, NiFi and Kafka together.

Since NiFi 1.1.0, it provides components to act as a WebSocket server. This example contains a Zeppelin notebook paragraph which uses %angular interpreter to establish a WebSocket connection with NiFi, and draw a pie chart using D3, refreshed automatically as it received updated data from NiFi in realtime manner.

And also, a NiFi template to run a WebSocket server, which exposes latest data for the pie chart. The data can be passed through a Kafka topic.

%angular
<script>
var graphId = '#myGraph';
var w = 600;
var h = 400;
var r = Math.min(w, h) / 2;
var color = d3.scale.category20c();
var svg = d3.select(graphId).append('svg')
.attr('width', w)
.attr('height', h)
.append('g')
.attr('transform', 'translate(' + r + ',' + r + ')');
var pie = d3.layout.pie()
.sort(null)
.value(function(d) {return d.value});
var arc = d3.svg.arc()
.outerRadius(r);
var update = function(dataPoints) {
var arcs = svg.selectAll('.slice')
.data(pie(dataPoints));
// Add new slices if needed.
var g = arcs.enter()
.append('g')
.attr('class', 'slice');
g.append('path');
g.append('text')
.attr('text-anchor', 'middle');
// Update existing and added slices.
arcs.select('path')
.attr('fill', function(d, i){
return color(i);
})
.attr('d', function(d) {
return arc(d);
});
arcs.select('text')
.attr('transform', function(d) {
d.innerRadius = 0;
d.outerRadius = r;
return 'translate(' + arc.centroid(d) + ")";
})
.text(function(d){
return d.data.label;
});
// Remove exitted slices.
arcs.exit().remove();
}
var wsUri = "ws://localhost:9091/test";
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {console.log('connected')};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {
console.log(evt)
var dataPoints = JSON.parse(evt.data);
update(dataPoints);
};
</script>
<div id="myGraph">
</div>
<?xml version="1.0" ?>
<template encoding-version="1.0">
<description></description>
<groupId>f201cb4a-0158-1000-1640-0c438e67f63f</groupId>
<name>Zeppelin-WebSocket-Example</name>
<snippet>
<processGroups>
<id>f24f72e7-0158-1000-0000-000000000000</id>
<parentGroupId>f201cb4a-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>449.5355995079349</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>f2266162-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f226498e-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f2255c39-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f22f5c60-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f22f4b49-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f22e8a91-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f22fcb12-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f2255c39-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f22f4b49-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f2390669-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f23818dd-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f2389855-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f2393dd4-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f2389855-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f22e8a91-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f23970ce-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f226498e-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f24f72e7-0158-1000-0000-000000000000</groupId>
<id>f23818dd-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<processors>
<id>f2255c39-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>2674.5727421720485</x>
<y>709.2950298489031</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Cache update strategy</key>
<value>
<name>Cache update strategy</name>
</value>
</entry>
<entry>
<key>Max cache entry size</key>
<value>
<name>Max cache entry size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>data</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>f225d67d-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>Cache update strategy</key>
<value>replace</value>
</entry>
<entry>
<key>Max cache entry size</key>
<value>1 MB</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Put latest data</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.PutDistributedMapCache</type>
</processors>
<processors>
<id>f226498e-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>2947.2659821923653</x>
<y>955.5129513353336</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Terminate</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>f22e8a91-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>2914.7343666033576</x>
<y>267.47245896681363</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Input Directory</key>
<value>
<name>Input Directory</name>
</value>
</entry>
<entry>
<key>Recurse Subdirectories</key>
<value>
<name>Recurse Subdirectories</name>
</value>
</entry>
<entry>
<key>Input Directory Location</key>
<value>
<name>Input Directory Location</name>
</value>
</entry>
<entry>
<key>File Filter</key>
<value>
<name>File Filter</name>
</value>
</entry>
<entry>
<key>Path Filter</key>
<value>
<name>Path Filter</name>
</value>
</entry>
<entry>
<key>Minimum File Age</key>
<value>
<name>Minimum File Age</name>
</value>
</entry>
<entry>
<key>Maximum File Age</key>
<value>
<name>Maximum File Age</name>
</value>
</entry>
<entry>
<key>Minimum File Size</key>
<value>
<name>Minimum File Size</name>
</value>
</entry>
<entry>
<key>Maximum File Size</key>
<value>
<name>Maximum File Size</name>
</value>
</entry>
<entry>
<key>Ignore Hidden Files</key>
<value>
<name>Ignore Hidden Files</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Input Directory</key>
<value>/tmp</value>
</entry>
<entry>
<key>Recurse Subdirectories</key>
<value>false</value>
</entry>
<entry>
<key>Input Directory Location</key>
<value>Local</value>
</entry>
<entry>
<key>File Filter</key>
<value>input.json</value>
</entry>
<entry>
<key>Path Filter</key>
</entry>
<entry>
<key>Minimum File Age</key>
<value>0 sec</value>
</entry>
<entry>
<key>Maximum File Age</key>
</entry>
<entry>
<key>Minimum File Size</key>
<value>0 B</value>
</entry>
<entry>
<key>Maximum File Size</key>
</entry>
<entry>
<key>Ignore Hidden Files</key>
<value>true</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>10 ms</yieldDuration>
</config>
<name>ListFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ListFile</type>
</processors>
<processors>
<id>f22f4b49-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>2667.3162301378</x>
<y>496.3898216748902</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File to Fetch</key>
<value>
<name>File to Fetch</name>
</value>
</entry>
<entry>
<key>Completion Strategy</key>
<value>
<name>Completion Strategy</name>
</value>
</entry>
<entry>
<key>Move Destination Directory</key>
<value>
<name>Move Destination Directory</name>
</value>
</entry>
<entry>
<key>Move Conflict Strategy</key>
<value>
<name>Move Conflict Strategy</name>
</value>
</entry>
<entry>
<key>Log level when file not found</key>
<value>
<name>Log level when file not found</name>
</value>
</entry>
<entry>
<key>Log level when permission denied</key>
<value>
<name>Log level when permission denied</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File to Fetch</key>
<value>${absolute.path}/${filename}</value>
</entry>
<entry>
<key>Completion Strategy</key>
<value>None</value>
</entry>
<entry>
<key>Move Destination Directory</key>
</entry>
<entry>
<key>Move Conflict Strategy</key>
<value>Rename</value>
</entry>
<entry>
<key>Log level when file not found</key>
<value>ERROR</value>
</entry>
<entry>
<key>Log level when permission denied</key>
<value>ERROR</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>FetchFile</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>not.found</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>permission.denied</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.FetchFile</type>
</processors>
<processors>
<id>f23818dd-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>3156.1823613126735</x>
<y>707.4355327785906</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Cache update strategy</key>
<value>
<name>Cache update strategy</name>
</value>
</entry>
<entry>
<key>Max cache entry size</key>
<value>
<name>Max cache entry size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>updated</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>f225d67d-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>Cache update strategy</key>
<value>replace</value>
</entry>
<entry>
<key>Max cache entry size</key>
<value>1 MB</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Put latest data timestamp</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.PutDistributedMapCache</type>
</processors>
<processors>
<id>f2389855-0158-1000-0000-000000000000</id>
<parentGroupId>f24f72e7-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>3157.7845285451194</x>
<y>500.3066329964257</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>${now():toNumber()}</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Always Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Set current timestamp</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
</contents>
<name>UpdateData via File</name>
</processGroups>
<processGroups>
<id>f2529106-0158-1000-0000-000000000000</id>
<parentGroupId>f201cb4a-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>108.92538095738507</y>
</position>
<comments></comments>
<contents>
<connections>
<id>f20323b2-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f2286acf-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>connected</selectedRelationships>
<source>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f202550f-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f204cb00-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f20307f1-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>30 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f2049ada-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f204da55-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f2329c63-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f2049ada-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f2291964-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f2049ada-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f2286acf-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f233ad72-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f23a4a50-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f2329c63-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f23aa8de-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>2555.90283203125</x>
<y>298.4917907714844</y>
</bends>
<destination>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f23c4eff-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f23a4a50-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f23d4e37-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>2247.765625</x>
<y>469.3678283691406</y>
</bends>
<destination>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f23a4a50-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>unmatched</selectedRelationships>
<source>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f23c4eff-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f23d8aa1-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>2051.678466796875</x>
<y>407.74041748046875</y>
</bends>
<destination>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f2286acf-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>new</selectedRelationships>
<source>
<groupId>f2529106-0158-1000-0000-000000000000</groupId>
<id>f23c4eff-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>f2028342-0158-1000-0000-000000000000</id>
<parentGroupId>f201cb4a-0158-1000-0000-000000000000</parentGroupId>
<comments></comments>
<descriptors>
<entry>
<key>input-buffer-size</key>
<value>
<name>input-buffer-size</name>
</value>
</entry>
<entry>
<key>max-text-message-size</key>
<value>
<name>max-text-message-size</name>
</value>
</entry>
<entry>
<key>max-binary-message-size</key>
<value>
<name>max-binary-message-size</name>
</value>
</entry>
<entry>
<key>listen-port</key>
<value>
<name>listen-port</name>
</value>
</entry>
<entry>
<key>ssl-context-service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl-context-service</name>
</value>
</entry>
<entry>
<key>client-authentication</key>
<value>
<name>client-authentication</name>
</value>
</entry>
</descriptors>
<name>JettyWebSocketServer</name>
<properties>
<entry>
<key>input-buffer-size</key>
</entry>
<entry>
<key>max-text-message-size</key>
</entry>
<entry>
<key>max-binary-message-size</key>
</entry>
<entry>
<key>listen-port</key>
<value>9091</value>
</entry>
<entry>
<key>ssl-context-service</key>
</entry>
<entry>
<key>client-authentication</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.websocket.jetty.JettyWebSocketServer</type>
</controllerServices>
<processors>
<id>f202550f-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>1690.317720978921</x>
<y>27.642636677128365</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>websocket-server-controller-service</key>
<value>
<identifiesControllerService>org.apache.nifi.websocket.WebSocketServerService</identifiesControllerService>
<name>websocket-server-controller-service</name>
</value>
</entry>
<entry>
<key>server-url-path</key>
<value>
<name>server-url-path</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>websocket-server-controller-service</key>
<value>f2028342-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>server-url-path</key>
<value>/test</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ListenWebSocket</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>binary message</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>connected</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>text message</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.websocket.ListenWebSocket</type>
</processors>
<processors>
<id>f20307f1-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>1688.5395417626778</x>
<y>843.8004269212711</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Terminate</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>f2049ada-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>1690.339179126679</x>
<y>616.7110104749274</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>websocket-session-id</key>
<value>
<name>websocket-session-id</name>
</value>
</entry>
<entry>
<key>websocket-controller-service-id</key>
<value>
<name>websocket-controller-service-id</name>
</value>
</entry>
<entry>
<key>websocket-endpoint-id</key>
<value>
<name>websocket-endpoint-id</name>
</value>
</entry>
<entry>
<key>websocket-message-type</key>
<value>
<name>websocket-message-type</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>websocket-session-id</key>
<value>${websocket.session.id}</value>
</entry>
<entry>
<key>websocket-controller-service-id</key>
<value>${websocket.controller.service.id}</value>
</entry>
<entry>
<key>websocket-endpoint-id</key>
<value>${websocket.endpoint.id}</value>
</entry>
<entry>
<key>websocket-message-type</key>
<value>TEXT</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>PutWebSocket</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.websocket.PutWebSocket</type>
</processors>
<processors>
<id>f2286acf-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>1689.4703961369883</x>
<y>233.84496901008492</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Put Cache Value In Attribute</key>
<value>
<name>Put Cache Value In Attribute</name>
</value>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>
<name>Max Length To Put In Attribute</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>data</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>f225d67d-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>Put Cache Value In Attribute</key>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>256</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Fetch latest data</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>not-found</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.FetchDistributedMapCache</type>
</processors>
<processors>
<id>f2329c63-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>2383.009225500612</x>
<y>617.4249807605195</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>sent</key>
<value>
<name>sent</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>sent</key>
<value>${now():toNumber()}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Update sent timestamp</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>f23a4a50-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>2380.9163189885508</x>
<y>404.2394697425068</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Put Cache Value In Attribute</key>
<value>
<name>Put Cache Value In Attribute</name>
</value>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>
<name>Max Length To Put In Attribute</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>updated</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>f225d67d-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>Put Cache Value In Attribute</key>
<value>updated</value>
</entry>
<entry>
<key>Max Length To Put In Attribute</key>
<value>256</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Fetch latest timestamp</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>not-found</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.FetchDistributedMapCache</type>
</processors>
<processors>
<id>f23c4eff-0158-1000-0000-000000000000</id>
<parentGroupId>f2529106-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>2072.5294577701034</x>
<y>233.13441721863472</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Routing Strategy</key>
<value>
<name>Routing Strategy</name>
</value>
</entry>
<entry>
<key>new</key>
<value>
<name>new</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Routing Strategy</key>
<value>Route to Property name</value>
</entry>
<entry>
<key>new</key>
<value>${updated:gt(${sent})}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Is new message?</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>new</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>unmatched</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.RouteOnAttribute</type>
</processors>
</contents>
<name>WebSocket Notification</name>
</processGroups>
<processGroups>
<id>f25939d2-0158-1000-0000-000000000000</id>
<parentGroupId>f201cb4a-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>454.5684066447866</x>
<y>211.15501794585634</y>
</position>
<comments></comments>
<contents>
<connections>
<id>f2598ffb-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>3146.227783203125</x>
<y>501.2459716796875</y>
</bends>
<bends>
<x>3146.227783203125</x>
<y>907.3923950195312</y>
</bends>
<destination>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ff6-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ff5-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f2598ffe-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>3657.97216796875</x>
<y>726.6572265625</y>
</bends>
<destination>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ff9-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ffa-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f2599000-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>3356.408447265625</x>
<y>908.40771484375</y>
</bends>
<destination>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ff6-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ff9-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f25ab3fa-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ff5-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f25a4da7-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>f25b1b9a-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>3655.94140625</x>
<y>569.2754516601562</y>
</bends>
<destination>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ffa-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>f25939d2-0158-1000-0000-000000000000</groupId>
<id>f2598ff5-0158-1000-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>f225d67d-0158-1000-0000-000000000000</id>
<parentGroupId>f201cb4a-0158-1000-0000-000000000000</parentGroupId>
<comments></comments>
<descriptors>
<entry>
<key>Server Hostname</key>
<value>
<name>Server Hostname</name>
</value>
</entry>
<entry>
<key>Server Port</key>
<value>
<name>Server Port</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>
<name>Communications Timeout</name>
</value>
</entry>
</descriptors>
<name>DistributedMapCacheClientService</name>
<properties>
<entry>
<key>Server Hostname</key>
<value>localhost</value>
</entry>
<entry>
<key>Server Port</key>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Communications Timeout</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</type>
</controllerServices>
<processors>
<id>f2598ff5-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>3181.8871952970485</x>
<y>437.043534487575</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Cache update strategy</key>
<value>
<name>Cache update strategy</name>
</value>
</entry>
<entry>
<key>Max cache entry size</key>
<value>
<name>Max cache entry size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>data</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>f225d67d-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>Cache update strategy</key>
<value>replace</value>
</entry>
<entry>
<key>Max cache entry size</key>
<value>1 MB</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Put latest data</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.PutDistributedMapCache</type>
</processors>
<processors>
<id>f2598ff6-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>3059.5875153954903</x>
<y>977.4669308275211</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Terminate</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>f2598ff9-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>3181.1823613126735</x>
<y>732.4355327785906</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Cache update strategy</key>
<value>
<name>Cache update strategy</name>
</value>
</entry>
<entry>
<key>Max cache entry size</key>
<value>
<name>Max cache entry size</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>updated</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>f225d67d-0158-1000-0000-000000000000</value>
</entry>
<entry>
<key>Cache update strategy</key>
<value>replace</value>
</entry>
<entry>
<key>Max cache entry size</key>
<value>1 MB</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Put latest data timestamp</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.PutDistributedMapCache</type>
</processors>
<processors>
<id>f2598ffa-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>3183.8921945607444</x>
<y>585.305473328457</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Regular Expression</key>
<value>
<name>Regular Expression</name>
</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>
<name>Replacement Value</name>
</value>
</entry>
<entry>
<key>Character Set</key>
<value>
<name>Character Set</name>
</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>
<name>Maximum Buffer Size</name>
</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>
<name>Replacement Strategy</name>
</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>
<name>Evaluation Mode</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Regular Expression</key>
<value>(?s)(^.*$)</value>
</entry>
<entry>
<key>Replacement Value</key>
<value>${now():toNumber()}</value>
</entry>
<entry>
<key>Character Set</key>
<value>UTF-8</value>
</entry>
<entry>
<key>Maximum Buffer Size</key>
<value>1 MB</value>
</entry>
<entry>
<key>Replacement Strategy</key>
<value>Always Replace</value>
</entry>
<entry>
<key>Evaluation Mode</key>
<value>Entire text</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Set current timestamp</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ReplaceText</type>
</processors>
<processors>
<id>f25a4da7-0158-1000-0000-000000000000</id>
<parentGroupId>f25939d2-0158-1000-0000-000000000000</parentGroupId>
<position>
<x>3179.7434666276586</x>
<y>239.67822193417828</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>bootstrap.servers</key>
<value>
<name>bootstrap.servers</name>
</value>
</entry>
<entry>
<key>security.protocol</key>
<value>
<name>security.protocol</name>
</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
<value>
<name>sasl.kerberos.service.name</name>
</value>
</entry>
<entry>
<key>ssl.context.service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>ssl.context.service</name>
</value>
</entry>
<entry>
<key>topic</key>
<value>
<name>topic</name>
</value>
</entry>
<entry>
<key>group.id</key>
<value>
<name>group.id</name>
</value>
</entry>
<entry>
<key>auto.offset.reset</key>
<value>
<name>auto.offset.reset</name>
</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>
<name>key-attribute-encoding</name>
</value>
</entry>
<entry>
<key>message-demarcator</key>
<value>
<name>message-demarcator</name>
</value>
</entry>
<entry>
<key>max.poll.records</key>
<value>
<name>max.poll.records</name>
</value>
</entry>
<entry>
<key>max-uncommit-offset-wait</key>
<value>
<name>max-uncommit-offset-wait</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>bootstrap.servers</key>
<value>localhost:9092</value>
</entry>
<entry>
<key>security.protocol</key>
<value>PLAINTEXT</value>
</entry>
<entry>
<key>sasl.kerberos.service.name</key>
</entry>
<entry>
<key>ssl.context.service</key>
</entry>
<entry>
<key>topic</key>
<value>test</value>
</entry>
<entry>
<key>group.id</key>
<value>nifi</value>
</entry>
<entry>
<key>auto.offset.reset</key>
<value>latest</value>
</entry>
<entry>
<key>key-attribute-encoding</key>
<value>utf-8</value>
</entry>
<entry>
<key>message-demarcator</key>
</entry>
<entry>
<key>max.poll.records</key>
<value>10000</value>
</entry>
<entry>
<key>max-uncommit-offset-wait</key>
<value>1 secs</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>10 ms</yieldDuration>
</config>
<name>ConsumeKafka_0_10</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10</type>
</processors>
</contents>
<name>UpdateData via Kafka</name>
</processGroups>
</snippet>
<timestamp>12/12/2016 19:30:31 JST</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment