Skip to content

Instantly share code, notes, and snippets.

@sukram42
Last active August 31, 2021 07:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sukram42/f635344c93f76173a36646c83286a830 to your computer and use it in GitHub Desktop.
Save sukram42/f635344c93f76173a36646c83286a830 to your computer and use it in GitHub Desktop.
NiFi Execute Processor -> Access Distributed Map Cache Server
'''
Script to put and get values from a distributed map cache in APACHE NIFI.
Only works when the id of the Distributed map cache client service is put into a property called "clientServiceId".
Sources:
https://www.javadoc.io/doc/org.apache.nifi/nifi-distributed-cache-client-service-api/1.7.0/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.html
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148
'''
import json
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from java.nio.charset import StandardCharsets
from org.python.core.util import StringUtil
from org.apache.nifi.distributed.cache.client import DistributedMapCacheClient, Serializer, Deserializer
class TestClass(StreamCallback):
def __init__(self):
super(TestClass, self).__init__()
self.distributedMapCache = DistributedMapCache()
def process(self, inputStream, outputStream):
"""
Processing of the flow File: Write and read a value in the Distributed Map Cache
"""
self.distributedMapCache.put("hallo", "test")
log.info(self.distributedMapCache.get('hallo'))
class DistributedMapCache:
"""
Class to access the Distributed Map Cache Client Server
"""
def __init__(self):
self.client = clientServiceId.asControllerService(DistributedMapCacheClient)
def get(self, key):
result = self.client.get(key, StringSerializer(), StringDeserializer())
return str(result)
def put(self, key, value):
self.client.put(key, value, StringSerializer(), StringSerializer())
class StringSerializer(Serializer):
def __init__(self):
pass
def serialize(self, value, out):
out.write(value)
class StringDeserializer(Deserializer):
def __init__(self):
pass
def deserialize(self, bytes):
return StringUtil.fromBytes(bytes)
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile, TestClass())
session.transfer(flowFile, REL_SUCCESS)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment