Last active
August 31, 2021 07:17
-
-
Save sukram42/f635344c93f76173a36646c83286a830 to your computer and use it in GitHub Desktop.
NiFi Execute Processor -> Access Distributed Map Cache Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Script to put and get values from a distributed map cache in APACHE NIFI. | |
Only works when the id of the Distributed map cache client service is put into a property called "clientServiceId". | |
Sources: | |
https://www.javadoc.io/doc/org.apache.nifi/nifi-distributed-cache-client-service-api/1.7.0/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.html | |
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-3/ta-p/249148 | |
''' | |
import json | |
from org.apache.commons.io import IOUtils | |
from org.apache.nifi.processor.io import StreamCallback | |
from java.nio.charset import StandardCharsets | |
from org.python.core.util import StringUtil | |
from org.apache.nifi.distributed.cache.client import DistributedMapCacheClient, Serializer, Deserializer | |
class TestClass(StreamCallback): | |
def __init__(self): | |
super(TestClass, self).__init__() | |
self.distributedMapCache = DistributedMapCache() | |
def process(self, inputStream, outputStream): | |
""" | |
Processing of the flow File: Write and read a value in the Distributed Map Cache | |
""" | |
self.distributedMapCache.put("hallo", "test") | |
log.info(self.distributedMapCache.get('hallo')) | |
class DistributedMapCache: | |
""" | |
Class to access the Distributed Map Cache Client Server | |
""" | |
def __init__(self): | |
self.client = clientServiceId.asControllerService(DistributedMapCacheClient) | |
def get(self, key): | |
result = self.client.get(key, StringSerializer(), StringDeserializer()) | |
return str(result) | |
def put(self, key, value): | |
self.client.put(key, value, StringSerializer(), StringSerializer()) | |
class StringSerializer(Serializer): | |
def __init__(self): | |
pass | |
def serialize(self, value, out): | |
out.write(value) | |
class StringDeserializer(Deserializer): | |
def __init__(self): | |
pass | |
def deserialize(self, bytes): | |
return StringUtil.fromBytes(bytes) | |
flowFile = session.get() | |
if (flowFile != None): | |
flowFile = session.write(flowFile, TestClass()) | |
session.transfer(flowFile, REL_SUCCESS) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment