Skip to content

Instantly share code, notes, and snippets.

@ryanpersaud
Last active May 26, 2016 16:36
Show Gist options
  • Save ryanpersaud/734b68e3624d06433deaa114acc33865 to your computer and use it in GitHub Desktop.
Save ryanpersaud/734b68e3624d06433deaa114acc33865 to your computer and use it in GitHub Desktop.
A NiFi template for the ExecuteScript processor that uses Jython to convert string timestamps to longs in JSON. Based on @mattyb149's template (https://gist.github.com/mattyb149/89205fcbc6d0e15ba024)
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<template>
<description>Converts string timestamps to long in JSON.
</description>
<name>ConvertStringTimestampsToLong</name>
<snippet>
<processors>
<id>c6cf8df9-bfcf-45e9-ab06-76de85e78f78</id>
<parentGroupId>a27e19b1-ef3e-41a2-b0ce-266370b14b47</parentGroupId>
<position>
<x>1233.620849609375</x>
<y>5.457275390625</y>
</position>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<defaultConcurrentTasks>
<entry>
<key>TIMER_DRIVEN</key>
<value>1</value>
</entry>
<entry>
<key>EVENT_DRIVEN</key>
<value>0</value>
</entry>
<entry>
<key>CRON_DRIVEN</key>
<value>1</value>
</entry>
</defaultConcurrentTasks>
<defaultSchedulingPeriod>
<entry>
<key>TIMER_DRIVEN</key>
<value>0 sec</value>
</entry>
<entry>
<key>CRON_DRIVEN</key>
<value>* * * * * ?</value>
</entry>
</defaultSchedulingPeriod>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<allowableValues>
<displayName>ECMAScript</displayName>
<value>ECMAScript</value>
</allowableValues>
<allowableValues>
<displayName>Groovy</displayName>
<value>Groovy</value>
</allowableValues>
<allowableValues>
<displayName>lua</displayName>
<value>lua</value>
</allowableValues>
<allowableValues>
<displayName>python</displayName>
<value>python</value>
</allowableValues>
<allowableValues>
<displayName>ruby</displayName>
<value>ruby</value>
</allowableValues>
<defaultValue>ECMAScript</defaultValue>
<description>The engine to execute scripts</description>
<displayName>Script Engine</displayName>
<dynamic>false</dynamic>
<name>Script Engine</name>
<required>true</required>
<sensitive>false</sensitive>
<supportsEl>false</supportsEl>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<description>Path to script file to execute. Only one of Script File or Script Body may be used
</description>
<displayName>Script File</displayName>
<dynamic>false</dynamic>
<name>Script File</name>
<required>false</required>
<sensitive>false</sensitive>
<supportsEl>true</supportsEl>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<description>Body of script to execute. Only one of Script File or Script Body may be used</description>
<displayName>Script Body</displayName>
<dynamic>false</dynamic>
<name>Script Body</name>
<required>false</required>
<sensitive>false</sensitive>
<supportsEl>false</supportsEl>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<description>Comma-separated list of paths to files and/or directories which contain modules required by
the script.
</description>
<displayName>Module Directory</displayName>
<dynamic>false</dynamic>
<name>Module Directory</name>
<required>false</required>
<sensitive>false</sensitive>
<supportsEl>false</supportsEl>
</value>
</entry>
<entry>
<key>endTime</key>
<value>
<description></description>
<displayName>endTime</displayName>
<dynamic>true</dynamic>
<name>endTime</name>
<required>false</required>
<sensitive>false</sensitive>
<supportsEl>true</supportsEl>
</value>
</entry>
<entry>
<key>mrt</key>
<value>
<description></description>
<displayName>mrt</displayName>
<dynamic>true</dynamic>
<name>mrt</name>
<required>false</required>
<sensitive>false</sensitive>
<supportsEl>true</supportsEl>
</value>
</entry>
<entry>
<key>rt</key>
<value>
<description></description>
<displayName>rt</displayName>
<dynamic>true</dynamic>
<name>rt</name>
<required>false</required>
<sensitive>false</sensitive>
<supportsEl>true</supportsEl>
</value>
</entry>
<entry>
<key>startTime</key>
<value>
<description></description>
<displayName>startTime</displayName>
<dynamic>true</dynamic>
<name>startTime</name>
<required>false</required>
<sensitive>false</sensitive>
<supportsEl>true</supportsEl>
</value>
</entry>
</descriptors>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Script Engine</key>
<value>python</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<value>
import json
import java.io
import time
import datetime
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self, properties):
self.properties = properties
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
for key in self.properties:
if key.isDynamic() and key.getName() in obj:
obj[key.getName()] = int(time.mktime(datetime.datetime.strptime(obj[key.getName()], self.properties[key]).timetuple()) * 1000)
outputStream.write(bytearray(json.dumps(obj, indent=4).encode('utf-8')))
flowFile = session.get()
properties = context.getProperties()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback(properties))
flowFile = session.putAttribute(flowFile, &quot;filename&quot;, flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
session.transfer(flowFile, REL_SUCCESS)
</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
<entry>
<key>endTime</key>
<value>%Y-%m-%d %H:%M:%S.%f</value>
</entry>
<entry>
<key>mrt</key>
<value>%Y-%m-%d %H:%M:%S.%f</value>
</entry>
<entry>
<key>rt</key>
<value>%Y-%m-%d %H:%M:%S.%f</value>
</entry>
<entry>
<key>startTime</key>
<value>%Y-%m-%d %H:%M:%S.%f</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ExecuteScriptConvertTimestampsToLongs</name>
<relationships>
<autoTerminate>true</autoTerminate>
<description>FlowFiles that failed to be processed</description>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<description>FlowFiles that were successfully processed</description>
<name>success</name>
</relationships>
<state>RUNNING</state>
<style/>
<supportsEventDriven>false</supportsEventDriven>
<supportsParallelProcessing>false</supportsParallelProcessing>
<type>org.apache.nifi.processors.script.ExecuteScript</type>
</processors>
</snippet>
<timestamp>05/26/2016 12:18:00 EDT</timestamp>
</template>
@ryanpersaud
Copy link
Author

ryanpersaud commented May 26, 2016

Example configuration. The property name is the name of the JSON field to convert and the value is the (Python) timestamp format.

untitled

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment