Skip to content

Instantly share code, notes, and snippets.

Created April 25, 2017 15:57
Show Gist options
  • Save mattyb149/24b20977b411f5c40e82cf08d961b8a0 to your computer and use it in GitHub Desktop.
Save mattyb149/24b20977b411f5c40e82cf08d961b8a0 to your computer and use it in GitHub Desktop.
A NiFi 1.2.0 template with sample data and script to illustrate the use of ScriptedReader
<?xml version="1.0" ?>
<template encoding-version="1.1">
<key>Schema Write Strategy</key>
<name>Schema Write Strategy</name>
<key>Schema Access Strategy</key>
<name>Schema Access Strategy</name>
<key>Schema Registry</key>
<name>Schema Registry</name>
<key>Schema Name</key>
<name>Schema Name</name>
<key>Date Format</key>
<name>Date Format</name>
<key>Time Format</key>
<name>Time Format</name>
<key>Timestamp Format</key>
<name>Timestamp Format</name>
<key>CSV Format</key>
<name>CSV Format</name>
<key>Value Separator</key>
<name>Value Separator</name>
<key>Include Header Line</key>
<name>Include Header Line</name>
<key>Quote Character</key>
<name>Quote Character</name>
<key>Escape Character</key>
<name>Escape Character</name>
<key>Comment Marker</key>
<name>Comment Marker</name>
<key>Null String</key>
<name>Null String</name>
<key>Trim Fields</key>
<name>Trim Fields</name>
<key>Quote Mode</key>
<name>Quote Mode</name>
<key>Record Separator</key>
<name>Record Separator</name>
<key>Include Trailing Delimiter</key>
<name>Include Trailing Delimiter</name>
<key>Schema Write Strategy</key>
<key>Schema Access Strategy</key>
<key>Schema Registry</key>
<key>Schema Name</key>
<key>Date Format</key>
<key>Time Format</key>
<key>Timestamp Format</key>
<value>yyyy-MM-dd HH:mm:ss</value>
<key>CSV Format</key>
<key>Value Separator</key>
<key>Include Header Line</key>
<key>Quote Character</key>
<key>Escape Character</key>
<key>Comment Marker</key>
<key>Null String</key>
<key>Trim Fields</key>
<key>Quote Mode</key>
<key>Record Separator</key>
<key>Include Trailing Delimiter</key>
<key>Script Engine</key>
<name>Script Engine</name>
<key>Script File</key>
<name>Script File</name>
<key>Script Body</key>
<name>Script Body</name>
<key>Module Directory</key>
<name>Module Directory</name>
<key>Script Engine</key>
<key>Script File</key>
<key>Script Body</key>
<value>import groovy.json.JsonSlurper
import org.apache.nifi.controller.*
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
class GroovyXmlRecordReader implements RecordReader {
def recordIterator
def recordSchema
GroovyXmlRecordReader(final String recordTag, final RecordSchema schema, final InputStream inputStream) {
recordSchema = schema
def xml = new XmlSlurper().parse(inputStream)
// Change the XML fields to a MapRecord for each incoming record
recordIterator = xml[recordTag].collect {r -&gt;
// Create a map of field names to values, using the field names from the schema as keys into the XML object
def fields = recordSchema.fieldNames.inject([:]) {result, fieldName -&gt;
result[fieldName] = r[fieldName].toString()
new MapRecord(recordSchema, fields)
Record nextRecord() throws IOException, MalformedRecordException {
return recordIterator?.hasNext() ? : null
RecordSchema getSchema() throws MalformedRecordException {
return recordSchema
void close() throws IOException { }
class GroovyXmlRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
// Will be set by the ScriptedRecordReaderFactory
ConfigurationContext configurationContext
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
// Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType
def schemaText = flowFile.getAttribute('schema.text')
if (!schemaText) throw new IOException('No schema set in schema.text')
def jsonSchema = new JsonSlurper().parseText(schemaText)
def recordSchema = new SimpleRecordSchema(jsonSchema.collect {field -&gt;
def entry = field.entrySet()[0]
new RecordField(entry.key, RecordFieldType.of(entry.value).dataType)
} as List&lt;RecordField&gt;)
return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream)
// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader
reader = new GroovyXmlRecordReaderFactory()
<key>Module Directory</key>
"type": "record",
"name": "name",
"fields" : [
{"name": "id", "type": "long"},
{"name": "name", "type": ["null", "string"]},
{"name": "code", "type": "long"}
"type": "record",
"name": "name",
"fields" : [
{"name": "id", "type": "long"},
{"name": "name", "type": ["null", "string"]}
"type": "record",
"name": "UserRecord",
"fields" : [
{"name": "id", "type": "long"},
{"name": "title", "type": ["null", "string"]},
{"name": "first", "type": ["null", "string"]},
{"name": "last", "type": ["null", "string"]},
{"name": "street", "type": ["null", "string"]},
{"name": "city", "type": ["null", "string"]},
{"name": "state", "type": ["null", "string"]},
{"name": "zip", "type": ["null", "string"]},
{"name": "gender", "type": ["null", "string"]},
{"name": "email", "type": ["null", "string"]},
{"name": "username", "type": ["null", "string"]},
{"name": "password", "type": ["null", "string"]},
{"name": "phone", "type": ["null", "string"]},
{"name": "cell", "type": ["null", "string"]},
{"name": "ssn", "type": ["null", "string"]},
{"name": "date_of_birth", "type": ["null", "string"]},
{"name": "reg_date", "type": ["null", "string"]},
{"name": "large", "type": ["null", "string"]},
{"name": "medium", "type": ["null", "string"]},
{"name": "thumbnail", "type": ["null", "string"]},
{"name": "version", "type": ["null", "string"]},
{"name": "nationality", "type": ["null", "string"]}
<penaltyDuration>30 sec</penaltyDuration>
<schedulingPeriod>0 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<key>Log Level</key>
<name>Log Level</name>
<key>Log Payload</key>
<name>Log Payload</name>
<key>Attributes to Log</key>
<name>Attributes to Log</name>
<key>Attributes to Ignore</key>
<name>Attributes to Ignore</name>
<key>Log prefix</key>
<name>Log prefix</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>Log Level</key>
<key>Log Payload</key>
<key>Attributes to Log</key>
<key>Attributes to Ignore</key>
<key>Log prefix</key>
<schedulingPeriod>0 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<key>File Size</key>
<name>File Size</name>
<key>Batch Size</key>
<name>Batch Size</name>
<key>Data Format</key>
<name>Data Format</name>
<key>Unique FlowFiles</key>
<name>Unique FlowFiles</name>
<penaltyDuration>30 sec</penaltyDuration>
<key>File Size</key>
<key>Batch Size</key>
<key>Data Format</key>
<key>Unique FlowFiles</key>
{"id": "int"},
{"name": "string"},
{"code": "int"}
<schedulingPeriod>30 sec</schedulingPeriod>
<yieldDuration>1 sec</yieldDuration>
<timestamp>04/25/2017 11:56:16 EDT</timestamp>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment