Skip to content

Instantly share code, notes, and snippets.

@mattyb149
Created April 25, 2017 15:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mattyb149/24b20977b411f5c40e82cf08d961b8a0 to your computer and use it in GitHub Desktop.
Save mattyb149/24b20977b411f5c40e82cf08d961b8a0 to your computer and use it in GitHub Desktop.
A NiFi 1.2.0 template with sample data and script to illustrate the use of ScriptedReader
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>a5a9dae4-015b-1000-7a1c-eaf2db85b188</groupId>
<name>ScriptedReaderTest</name>
<snippet>
<controllerServices>
<id>bb8e2ece-1712-3f89-0000-000000000000</id>
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.2.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Schema Write Strategy</key>
<value>
<name>Schema Write Strategy</name>
</value>
</entry>
<entry>
<key>Schema Access Strategy</key>
<value>
<name>Schema Access Strategy</name>
</value>
</entry>
<entry>
<key>Schema Registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>Schema Registry</name>
</value>
</entry>
<entry>
<key>Schema Name</key>
<value>
<name>Schema Name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>CSV Format</key>
<value>
<name>CSV Format</name>
</value>
</entry>
<entry>
<key>Value Separator</key>
<value>
<name>Value Separator</name>
</value>
</entry>
<entry>
<key>Include Header Line</key>
<value>
<name>Include Header Line</name>
</value>
</entry>
<entry>
<key>Quote Character</key>
<value>
<name>Quote Character</name>
</value>
</entry>
<entry>
<key>Escape Character</key>
<value>
<name>Escape Character</name>
</value>
</entry>
<entry>
<key>Comment Marker</key>
<value>
<name>Comment Marker</name>
</value>
</entry>
<entry>
<key>Null String</key>
<value>
<name>Null String</name>
</value>
</entry>
<entry>
<key>Trim Fields</key>
<value>
<name>Trim Fields</name>
</value>
</entry>
<entry>
<key>Quote Mode</key>
<value>
<name>Quote Mode</name>
</value>
</entry>
<entry>
<key>Record Separator</key>
<value>
<name>Record Separator</name>
</value>
</entry>
<entry>
<key>Include Trailing Delimiter</key>
<value>
<name>Include Trailing Delimiter</name>
</value>
</entry>
</descriptors>
<name>CSVRecordSetWriter</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Schema Write Strategy</key>
<value>full-schema-attribute</value>
</entry>
<entry>
<key>Schema Access Strategy</key>
<value>schema-name</value>
</entry>
<entry>
<key>Schema Registry</key>
<value>16091e45-f03a-3b17-0000-000000000000</value>
</entry>
<entry>
<key>Schema Name</key>
<value>code</value>
</entry>
<entry>
<key>schema-text</key>
<value>${avro.schema}</value>
</entry>
<entry>
<key>Date Format</key>
<value>yyyy-MM-dd</value>
</entry>
<entry>
<key>Time Format</key>
<value>HH:mm:ss</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>yyyy-MM-dd HH:mm:ss</value>
</entry>
<entry>
<key>CSV Format</key>
<value>custom</value>
</entry>
<entry>
<key>Value Separator</key>
<value>,</value>
</entry>
<entry>
<key>Include Header Line</key>
<value>true</value>
</entry>
<entry>
<key>Quote Character</key>
<value>"</value>
</entry>
<entry>
<key>Escape Character</key>
<value>\</value>
</entry>
<entry>
<key>Comment Marker</key>
</entry>
<entry>
<key>Null String</key>
</entry>
<entry>
<key>Trim Fields</key>
<value>true</value>
</entry>
<entry>
<key>Quote Mode</key>
<value>MINIMAL</value>
</entry>
<entry>
<key>Record Separator</key>
<value>\n</value>
</entry>
<entry>
<key>Include Trailing Delimiter</key>
<value>false</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.csv.CSVRecordSetWriter</type>
</controllerServices>
<controllerServices>
<id>db4be0ae-793d-3d98-0000-000000000000</id>
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-scripting-nar</artifact>
<group>org.apache.nifi</group>
<version>1.2.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
</descriptors>
<name>ScriptedReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<value>import groovy.json.JsonSlurper
import org.apache.nifi.controller.*
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.*
import org.apache.nifi.serialization.record.*
class GroovyXmlRecordReader implements RecordReader {
def recordIterator
def recordSchema
GroovyXmlRecordReader(final String recordTag, final RecordSchema schema, final InputStream inputStream) {
recordSchema = schema
def xml = new XmlSlurper().parse(inputStream)
// Change the XML fields to a MapRecord for each incoming record
recordIterator = xml[recordTag].collect {r -&gt;
// Create a map of field names to values, using the field names from the schema as keys into the XML object
def fields = recordSchema.fieldNames.inject([:]) {result, fieldName -&gt;
result[fieldName] = r[fieldName].toString()
result
}
new MapRecord(recordSchema, fields)
}.iterator()
}
Record nextRecord() throws IOException, MalformedRecordException {
return recordIterator?.hasNext() ? recordIterator.next() : null
}
RecordSchema getSchema() throws MalformedRecordException {
return recordSchema
}
void close() throws IOException { }
}
class GroovyXmlRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
// Will be set by the ScriptedRecordReaderFactory
ConfigurationContext configurationContext
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
// Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType
def schemaText = flowFile.getAttribute('schema.text')
if (!schemaText) throw new IOException('No schema set in schema.text')
def jsonSchema = new JsonSlurper().parseText(schemaText)
def recordSchema = new SimpleRecordSchema(jsonSchema.collect {field -&gt;
def entry = field.entrySet()[0]
new RecordField(entry.key, RecordFieldType.of(entry.value).dataType)
} as List&lt;RecordField&gt;)
return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream)
}
}
// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader
reader = new GroovyXmlRecordReaderFactory()
</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.record.script.ScriptedReader</type>
</controllerServices>
<controllerServices>
<id>16091e45-f03a-3b17-0000-000000000000</id>
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-registry-nar</artifact>
<group>org.apache.nifi</group>
<version>1.2.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>code</key>
<value>
<name>code</name>
</value>
</entry>
<entry>
<key>name</key>
<value>
<name>name</name>
</value>
</entry>
<entry>
<key>user</key>
<value>
<name>user</name>
</value>
</entry>
</descriptors>
<name>AvroSchemaRegistry</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>code</key>
<value>{
"type": "record",
"name": "name",
"fields" : [
{"name": "id", "type": "long"},
{"name": "name", "type": ["null", "string"]},
{"name": "code", "type": "long"}
]
}</value>
</entry>
<entry>
<key>name</key>
<value>{
"type": "record",
"name": "name",
"fields" : [
{"name": "id", "type": "long"},
{"name": "name", "type": ["null", "string"]}
]
}</value>
</entry>
<entry>
<key>user</key>
<value>{
"type": "record",
"name": "UserRecord",
"fields" : [
{"name": "id", "type": "long"},
{"name": "title", "type": ["null", "string"]},
{"name": "first", "type": ["null", "string"]},
{"name": "last", "type": ["null", "string"]},
{"name": "street", "type": ["null", "string"]},
{"name": "city", "type": ["null", "string"]},
{"name": "state", "type": ["null", "string"]},
{"name": "zip", "type": ["null", "string"]},
{"name": "gender", "type": ["null", "string"]},
{"name": "email", "type": ["null", "string"]},
{"name": "username", "type": ["null", "string"]},
{"name": "password", "type": ["null", "string"]},
{"name": "phone", "type": ["null", "string"]},
{"name": "cell", "type": ["null", "string"]},
{"name": "ssn", "type": ["null", "string"]},
{"name": "date_of_birth", "type": ["null", "string"]},
{"name": "reg_date", "type": ["null", "string"]},
{"name": "large", "type": ["null", "string"]},
{"name": "medium", "type": ["null", "string"]},
{"name": "thumbnail", "type": ["null", "string"]},
{"name": "version", "type": ["null", "string"]},
{"name": "nationality", "type": ["null", "string"]}
]
}</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
</controllerServices>
<processors>
<id>a359e14e-325b-3731-0000-000000000000</id>
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId>
<position>
<x>679.5938453247036</x>
<y>248.81901560117888</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.2.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>record-reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>record-reader</name>
</value>
</entry>
<entry>
<key>record-writer</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
<name>record-writer</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>record-reader</key>
<value>db4be0ae-793d-3d98-0000-000000000000</value>
</entry>
<entry>
<key>record-writer</key>
<value>bb8e2ece-1712-3f89-0000-000000000000</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ConvertRecord</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ConvertRecord</type>
</processors>
<processors>
<id>ec5ae3df-0b62-3112-0000-000000000000</id>
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>949.3994791857035</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.2.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>6982bc85-a7b2-3400-0000-000000000000</id>
<parentGroupId>f3e4227c-c306-3c57-0000-000000000000</parentGroupId>
<position>
<x>672.3773977059673</x>
<y>0.0</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.2.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>record.tag</key>
<value>
<name>record.tag</name>
</value>
</entry>
<entry>
<key>schema.text</key>
<value>
<name>schema.text</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>1</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>&lt;root&gt;
&lt;myRecord&gt;
&lt;id&gt;1&lt;/id&gt;
&lt;name&gt;John&lt;/name&gt;
&lt;code&gt;100&lt;/code&gt;
&lt;/myRecord&gt;
&lt;myRecord&gt;
&lt;id&gt;2&lt;/id&gt;
&lt;name&gt;Mary&lt;/name&gt;
&lt;code&gt;200&lt;/code&gt;
&lt;/myRecord&gt;
&lt;myRecord&gt;
&lt;id&gt;3&lt;/id&gt;
&lt;name&gt;Ramon&lt;/name&gt;
&lt;code&gt;300&lt;/code&gt;
&lt;/myRecord&gt;
&lt;/root&gt;</value>
</entry>
<entry>
<key>record.tag</key>
<value>myRecord</value>
</entry>
<entry>
<key>schema.text</key>
<value>[
{"id": "int"},
{"name": "string"},
{"code": "int"}
]</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>30 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>GenerateFlowFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
</snippet>
<timestamp>04/25/2017 11:56:16 EDT</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment