Skip to content

Instantly share code, notes, and snippets.

@randerzander
Last active March 12, 2017 20:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save randerzander/bff28a1383df4d5e75cdf42ebb8be17d to your computer and use it in GitHub Desktop.
Save randerzander/bff28a1383df4d5e75cdf42ebb8be17d to your computer and use it in GitHub Desktop.
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import os, sys, imp, traceback, time
parser_path = '/scripts/parsers/'
class PyStreamCallback(StreamCallback):
def __init__(self, result):
self.result = result
def process(self, instream, outstream):
outstream.write(self.result)
def fail(flowfile, err):
flowfile = session.putAttribute(flowfile, 'parse.error', err)
session.transfer(flowfile, REL_FAILURE)
def process(flowfile):
parser = flowfile.getAttribute('parser')
path = parser_path + parser + '.py'
# load the parser if it has been updated
if parser not in sys.modules or os.path.getmtime(path) > sys.modules[parser].loaded_at:
try:
module = imp.load_source(parser, path)
module.loaded_at = int(time.time())
except:
fail(flowfile, 'Loading Module: ' + traceback.format_exc())
return
parse_module = sys.modules[parser]
# Read flowfile content
data = {}
instream = session.read(flowfile)
if hasattr(parse_module, 'format') and parse_module.format.lower() == 'binary':
data['content'] = IOUtils.toByteArray(instream)
else:
data['content'] = IOUtils.toString(instream, StandardCharsets.UTF_8)
instream.close()
# Attempt to parse
try:
if hasattr(parse_module, 'attributes'):
for attribute in parse_module.attributes:
data[attribute] = flowfile.getAttribute(attribute)
result = parse_module.parse(data)
flowfile = session.write(flowfile, PyStreamCallback(result))
session.transfer(flowfile, REL_SUCCESS)
except:
fail(flowfile, 'Parsing: ' + traceback.format_exc())
# Execution starts here
flowfile = session.get()
if (flowfile != None): process(flowfile)
import json, datetime
attributes = ['filename', 's2s.host']
def parse(data):
unix_timestamp = str(datetime.datetime.fromtimestamp(int(data['filename'][0:13])/1000))
lines = data['content'].split('\n')
#Extract list of field names
fields = lines[0].replace('%', '').split()
fields = ['user', 'pid', 'cpu', 'mem', 'vsz', 'rss', 'tty', 'stat', 'startup', 'time', 'command']
processes = []
for line in lines[1:]:
tokens = line.split()
if len(tokens) == 0: continue
tokens = tokens[0:10] + [' '.join(tokens[10:])]
process = {'ts': unix_timestamp, 'host': data['s2s.host']}
for idx, field in enumerate(fields):
if field == 'cpu' or field == 'mem': process[field] = float(tokens[idx])
elif field == 'vsz' or field == 'rss': process[field] = int(tokens[idx])
else: process[field] = tokens[idx]
processes.append(json.dumps(process))
return '\n'.join(processes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment