Skip to content

Instantly share code, notes, and snippets.

@gaving
Created August 20, 2018 20:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gaving/80cfa88baa95859e35845c1e0eee1993 to your computer and use it in GitHub Desktop.
Save gaving/80cfa88baa95859e35845c1e0eee1993 to your computer and use it in GitHub Desktop.
import json
import sys
import traceback
import csv
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
newText =''
Text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
reader = csv.reader(Text,delimiter=' ')
for row in reader:
newText+=('eeeeeee'.join(row))
# Read input FlowFile content
# input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# input_obj = json.loads(input_text)
# Transform content
outputStream.write(StringUtil.toBytes(newText))
except:
traceback.print_exc(file=sys.stdout)
raise
flowFile = session.get()
if flowFile != None:
flowFile = session.write(flowFile, PyStreamCallback())
# Finish by transferring the FlowFile to an output relationship
session.transfer(flowFile, REL_SUCCESS)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment