Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active February 15, 2022 11:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ijokarumawak/27f646fdbfa29e71b8abf556aa50fe39 to your computer and use it in GitHub Desktop.
Save ijokarumawak/27f646fdbfa29e71b8abf556aa50fe39 to your computer and use it in GitHub Desktop.
Distribute file name list by files size sum

Distribute file name list by files size sum

Let's say there are files to be fetched at a remote file system such as HDFS:

$ ll lib
total 1973520
drwxr-xr-x  29 koji  staff       986 Jun  2 09:24 bootstrap
-rw-rw----   1 koji  staff     95806 Apr  4  2016 javax.servlet-api-3.1.0.jar
-rw-rw----   1 koji  staff     16515 Apr 19 09:34 jcl-over-slf4j-1.7.25.jar
-rw-rw----   1 koji  staff    189386 Jun  6  2016 jetty-schemas-3.1.jar
-rw-rw----   1 koji  staff      4596 Apr 19 09:32 jul-to-slf4j-1.7.25.jar
-rw-rw----   1 koji  staff     23645 Apr 19 09:34 log4j-over-slf4j-1.7.25.jar
-rw-rw----   1 koji  staff    290339 Apr 19 09:32 logback-classic-1.2.3.jar
...

How can we distribute the filenames among node so that each node can fetch files evenly in terms of total file size?

This Gist contains an example to create subset of file names grouped by total file size up to around 10MB. By executing this preprocessing before distributing list by RemoteProcessGroup, each node can process roughly even size of data.

Result FlowFiles contain Records look like below. Some contain only one file because it's around 10MB by itself:

{
  "totalSize" : 11360457,
  "files" : [ {
    "filename" : "nifi-hl7-nar-1.3.0-SNAPSHOT.nar",
    "filesize" : "11360457"
  } ]
}

While some others contain multiple files up to around 10MB in total:

{
  "totalSize" : 12357729,
  "files" : [ {
    "filename" : "nifi-nar-utils-1.3.0-SNAPSHOT.jar",
    "filesize" : "35197"
  }, {
    "filename" : "nifi-site-to-site-reporting-nar-1.3.0-SNAPSHOT.nar",
    "filesize" : "7584349"
  }, {
    "filename" : "jackson-core-2.6.1.jar",
    "filesize" : "258833"
  }, {
    "filename" : "nifi-snmp-nar-1.3.0-SNAPSHOT.nar",
    "filesize" : "933144"
  }, {
    "filename" : "nifi-social-media-nar-1.3.0-SNAPSHOT.nar",
    "filesize" : "3546206"
  } ]
}

The key ingredient is this Groovy script, used by the ScriptedReader:

import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.type.ArrayDataType
import org.apache.nifi.serialization.record.type.MapDataType
import org.apache.nifi.serialization.record.type.RecordDataType


class FilesReader implements RecordReader {

    def recordIterator
    def recordSchema

    FilesReader(final RecordSchema filesSchema, final RecordSchema fileSchema, final InputStream inputStream) {
        recordSchema = filesSchema
        def inputFiles = new JsonSlurper().parse(inputStream)
        def results = new ArrayList<Record>()
        def outputFiles
        def record
        // 10MB
        def maxSize = 1024 * 1024 * 10
        for (Map file : inputFiles['files']) {
            if (record == null) {
                record = new MapRecord(filesSchema, new HashMap())
                record.setValue('totalSize', 0)
                results.add(record)
                outputFiles = new ArrayList<Record>()
            }

            outputFiles.add(new MapRecord(fileSchema, file))
            def totalSize = record.getAsLong('totalSize') + Long.parseLong(file['filesize'])
            record.setValue('totalSize', totalSize)

            if (totalSize > maxSize) {
                // Create new record
                record.setValue('files', outputFiles.toArray(new Record[outputFiles.size()]))
                record = null
                outputFiles = null
            }
        }

        if (record != null && outputFiles != null) {
            record.setValue('files', outputFiles.toArray(new Record[outputFiles.size()]))
        }

        recordIterator = results.iterator()
    }

    Record nextRecord() throws IOException, MalformedRecordException {
        return recordIterator?.hasNext() ? recordIterator.next() : null
    }

    RecordSchema getSchema() throws MalformedRecordException {
        return recordSchema
    }

    void close() throws IOException {
    }
}

class FilesRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {

    // Will be set by the ScriptedRecordReaderFactory
    ConfigurationContext configurationContext

    RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {

        // Create schema here
        def fileFields = new ArrayList<RecordField>()
        fileFields.add(new RecordField("filesize", RecordFieldType.STRING.dataType))
        fileFields.add(new RecordField("filename", RecordFieldType.STRING.dataType))
        def fileSchema = new SimpleRecordSchema(fileFields)

        def fields = new ArrayList<RecordField>()
        fields.add(new RecordField("totalSize", RecordFieldType.LONG.dataType))
        fields.add(new RecordField("files", new ArrayDataType(new RecordDataType(fileSchema))))
        def filesSchema = new SimpleRecordSchema(fields)

        return new FilesReader(filesSchema, fileSchema, inputStream)
    }

}

// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader
reader = new FilesRecordReaderFactory()
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>1e8778a8-015c-1000-c747-8273276bab45</groupId>
<name>DistributeByFileSize</name>
<snippet>
<processGroups>
<id>037b7341-8a8c-3047-0000-000000000000</id>
<parentGroupId>5454b790-94fe-3e28-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>9ad80462-18fe-363a-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1726.132080078125</x>
<y>547.9493408203125</y>
</bends>
<destination>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>5b193af6-c04b-3d62-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>merged</selectedRelationships>
<source>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>9edb36ec-f1fe-38a4-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>9f6f7f38-ba09-34fa-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1723.1422119140625</x>
<y>423.8709411621094</y>
</bends>
<destination>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>9edb36ec-f1fe-38a4-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>2de05bd1-026d-32fe-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>e8a8af7f-fa85-332b-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>49dcbc11-f4e5-386b-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>32ae0c42-b717-3798-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>14661ec2-c902-3e7b-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>86344728-80cc-30d6-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>9edb36ec-f1fe-38a4-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>265072b4-4af5-33b7-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1724.6370849609375</x>
<y>686.9769897460938</y>
</bends>
<destination>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>d96a99d1-91f7-3f09-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>splits</selectedRelationships>
<source>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>5b193af6-c04b-3d62-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>4962eaaa-cd97-3c29-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1726.132080078125</x>
<y>262.4195251464844</y>
</bends>
<destination>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>2de05bd1-026d-32fe-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>49dcbc11-f4e5-386b-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>66940c5a-d222-3069-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>86344728-80cc-30d6-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>2de05bd1-026d-32fe-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>7b467b5a-dd0c-3d49-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>86344728-80cc-30d6-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>037b7341-8a8c-3047-0000-000000000000</groupId>
<id>5b193af6-c04b-3d62-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>cc843758-c4cc-3a27-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-record-serialization-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Schema Write Strategy</key>
<value>
<name>Schema Write Strategy</name>
</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>
<name>schema-access-strategy</name>
</value>
</entry>
<entry>
<key>schema-registry</key>
<value>
<identifiesControllerService>org.apache.nifi.schemaregistry.services.SchemaRegistry</identifiesControllerService>
<name>schema-registry</name>
</value>
</entry>
<entry>
<key>schema-name</key>
<value>
<name>schema-name</name>
</value>
</entry>
<entry>
<key>schema-text</key>
<value>
<name>schema-text</name>
</value>
</entry>
<entry>
<key>Date Format</key>
<value>
<name>Date Format</name>
</value>
</entry>
<entry>
<key>Time Format</key>
<value>
<name>Time Format</name>
</value>
</entry>
<entry>
<key>Timestamp Format</key>
<value>
<name>Timestamp Format</name>
</value>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>
<name>Pretty Print JSON</name>
</value>
</entry>
</descriptors>
<name>JsonRecordSetWriter</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Schema Write Strategy</key>
<value>schema-name</value>
</entry>
<entry>
<key>schema-access-strategy</key>
<value>schema-name</value>
</entry>
<entry>
<key>schema-registry</key>
<value>f3458bb9-25f0-368e-0000-000000000000</value>
</entry>
<entry>
<key>schema-name</key>
<value>${schema.name}</value>
</entry>
<entry>
<key>schema-text</key>
<value>${avro.schema}</value>
</entry>
<entry>
<key>Date Format</key>
</entry>
<entry>
<key>Time Format</key>
</entry>
<entry>
<key>Timestamp Format</key>
</entry>
<entry>
<key>Pretty Print JSON</key>
<value>false</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.json.JsonRecordSetWriter</type>
</controllerServices>
<controllerServices>
<id>f3458bb9-25f0-368e-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-registry-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>files</key>
<value>
<name>files</name>
</value>
</entry>
</descriptors>
<name>AvroSchemaRegistry</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>files</key>
<value>{
"type": "record",
"name": "files",
"fields": [
{
"name": "totalSize",
"type": [
"null",
"long"
]
},
{
"name": "files",
"type": {
"type": "array",
"items": {
"name": "file",
"type": "record",
"fields": [
{
"name": "filename",
"type": "string"
},
{
"name": "filesize",
"type": "string"
}
]
}
}
}
]
}</value>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.schemaregistry.services.AvroSchemaRegistry</type>
</controllerServices>
<controllerServices>
<id>196a6041-7aba-354d-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-scripting-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
</descriptors>
<name>ScriptedReader</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<value>import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.type.ArrayDataType
import org.apache.nifi.serialization.record.type.MapDataType
import org.apache.nifi.serialization.record.type.RecordDataType
class FilesReader implements RecordReader {
def recordIterator
def recordSchema
FilesReader(final RecordSchema filesSchema, final RecordSchema fileSchema, final InputStream inputStream) {
recordSchema = filesSchema
def inputFiles = new JsonSlurper().parse(inputStream)
def results = new ArrayList&lt;Record&gt;()
def outputFiles
def record
// 10MB
def maxSize = 1024 * 1024 * 10
for (Map file : inputFiles['files']) {
if (record == null) {
record = new MapRecord(filesSchema, new HashMap())
record.setValue('totalSize', 0)
results.add(record)
outputFiles = new ArrayList&lt;Record&gt;()
}
outputFiles.add(new MapRecord(fileSchema, file))
def totalSize = record.getAsLong('totalSize') + Long.parseLong(file['filesize'])
record.setValue('totalSize', totalSize)
if (totalSize &gt; maxSize) {
// Create new record
record.setValue('files', outputFiles.toArray(new Record[outputFiles.size()]))
record = null
outputFiles = null
}
}
if (record != null &amp;&amp; outputFiles != null) {
record.setValue('files', outputFiles.toArray(new Record[outputFiles.size()]))
}
recordIterator = results.iterator()
}
Record nextRecord() throws IOException, MalformedRecordException {
return recordIterator?.hasNext() ? recordIterator.next() : null
}
RecordSchema getSchema() throws MalformedRecordException {
return recordSchema
}
void close() throws IOException {
}
}
class FilesRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
// Will be set by the ScriptedRecordReaderFactory
ConfigurationContext configurationContext
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
// Create schema here
def fileFields = new ArrayList&lt;RecordField&gt;()
fileFields.add(new RecordField("filesize", RecordFieldType.STRING.dataType))
fileFields.add(new RecordField("filename", RecordFieldType.STRING.dataType))
def fileSchema = new SimpleRecordSchema(fileFields)
def fields = new ArrayList&lt;RecordField&gt;()
fields.add(new RecordField("totalSize", RecordFieldType.LONG.dataType))
fields.add(new RecordField("files", new ArrayDataType(new RecordDataType(fileSchema))))
def filesSchema = new SimpleRecordSchema(fields)
return new FilesReader(filesSchema, fileSchema, inputStream)
}
}
// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader
reader = new FilesRecordReaderFactory()
</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.record.script.ScriptedReader</type>
</controllerServices>
<processors>
<id>86344728-80cc-30d6-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<position>
<x>604.8449425312165</x>
<y>391.5280512137083</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>9edb36ec-f1fe-38a4-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<position>
<x>1232.5487017764017</x>
<y>407.6196997100657</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Merge Strategy</key>
<value>
<name>Merge Strategy</name>
</value>
</entry>
<entry>
<key>Merge Format</key>
<value>
<name>Merge Format</name>
</value>
</entry>
<entry>
<key>Attribute Strategy</key>
<value>
<name>Attribute Strategy</name>
</value>
</entry>
<entry>
<key>Correlation Attribute Name</key>
<value>
<name>Correlation Attribute Name</name>
</value>
</entry>
<entry>
<key>Minimum Number of Entries</key>
<value>
<name>Minimum Number of Entries</name>
</value>
</entry>
<entry>
<key>Maximum Number of Entries</key>
<value>
<name>Maximum Number of Entries</name>
</value>
</entry>
<entry>
<key>Minimum Group Size</key>
<value>
<name>Minimum Group Size</name>
</value>
</entry>
<entry>
<key>Maximum Group Size</key>
<value>
<name>Maximum Group Size</name>
</value>
</entry>
<entry>
<key>Max Bin Age</key>
<value>
<name>Max Bin Age</name>
</value>
</entry>
<entry>
<key>Maximum number of Bins</key>
<value>
<name>Maximum number of Bins</name>
</value>
</entry>
<entry>
<key>Delimiter Strategy</key>
<value>
<name>Delimiter Strategy</name>
</value>
</entry>
<entry>
<key>Header File</key>
<value>
<name>Header File</name>
</value>
</entry>
<entry>
<key>Footer File</key>
<value>
<name>Footer File</name>
</value>
</entry>
<entry>
<key>Demarcator File</key>
<value>
<name>Demarcator File</name>
</value>
</entry>
<entry>
<key>Compression Level</key>
<value>
<name>Compression Level</name>
</value>
</entry>
<entry>
<key>Keep Path</key>
<value>
<name>Keep Path</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Merge Strategy</key>
<value>Bin-Packing Algorithm</value>
</entry>
<entry>
<key>Merge Format</key>
<value>Binary Concatenation</value>
</entry>
<entry>
<key>Attribute Strategy</key>
<value>Keep Only Common Attributes</value>
</entry>
<entry>
<key>Correlation Attribute Name</key>
</entry>
<entry>
<key>Minimum Number of Entries</key>
<value>1</value>
</entry>
<entry>
<key>Maximum Number of Entries</key>
<value>100</value>
</entry>
<entry>
<key>Minimum Group Size</key>
<value>0 B</value>
</entry>
<entry>
<key>Maximum Group Size</key>
</entry>
<entry>
<key>Max Bin Age</key>
</entry>
<entry>
<key>Maximum number of Bins</key>
<value>5</value>
</entry>
<entry>
<key>Delimiter Strategy</key>
<value>Text</value>
</entry>
<entry>
<key>Header File</key>
<value>{"files":[</value>
</entry>
<entry>
<key>Footer File</key>
<value>]}</value>
</entry>
<entry>
<key>Demarcator File</key>
<value>,</value>
</entry>
<entry>
<key>Compression Level</key>
<value>1</value>
</entry>
<entry>
<key>Keep Path</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>MergeContent</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>merged</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.MergeContent</type>
</processors>
<processors>
<id>d96a99d1-91f7-3f09-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<position>
<x>1232.5487017764017</x>
<y>704.459125979597</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>2de05bd1-026d-32fe-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<position>
<x>1232.5487017764017</x>
<y>266.55362915342505</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Attributes List</key>
<value>
<name>Attributes List</name>
</value>
</entry>
<entry>
<key>Destination</key>
<value>
<name>Destination</name>
</value>
</entry>
<entry>
<key>Include Core Attributes</key>
<value>
<name>Include Core Attributes</name>
</value>
</entry>
<entry>
<key>Null Value</key>
<value>
<name>Null Value</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Attributes List</key>
<value>filesize,filename</value>
</entry>
<entry>
<key>Destination</key>
<value>flowfile-content</value>
</entry>
<entry>
<key>Include Core Attributes</key>
<value>true</value>
</entry>
<entry>
<key>Null Value</key>
<value>false</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>AttributesToJSON</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.AttributesToJSON</type>
</processors>
<processors>
<id>32ae0c42-b717-3798-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<position>
<x>604.8449425312165</x>
<y>120.56606506651099</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Input Directory</key>
<value>
<name>Input Directory</name>
</value>
</entry>
<entry>
<key>Recurse Subdirectories</key>
<value>
<name>Recurse Subdirectories</name>
</value>
</entry>
<entry>
<key>Input Directory Location</key>
<value>
<name>Input Directory Location</name>
</value>
</entry>
<entry>
<key>File Filter</key>
<value>
<name>File Filter</name>
</value>
</entry>
<entry>
<key>Path Filter</key>
<value>
<name>Path Filter</name>
</value>
</entry>
<entry>
<key>Minimum File Age</key>
<value>
<name>Minimum File Age</name>
</value>
</entry>
<entry>
<key>Maximum File Age</key>
<value>
<name>Maximum File Age</name>
</value>
</entry>
<entry>
<key>Minimum File Size</key>
<value>
<name>Minimum File Size</name>
</value>
</entry>
<entry>
<key>Maximum File Size</key>
<value>
<name>Maximum File Size</name>
</value>
</entry>
<entry>
<key>Ignore Hidden Files</key>
<value>
<name>Ignore Hidden Files</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Input Directory</key>
<value>/Users/koji/dev/nifi/nifi-assembly/target/nifi-1.3.0-SNAPSHOT-bin/nifi-1.3.0-SNAPSHOT/lib</value>
</entry>
<entry>
<key>Recurse Subdirectories</key>
<value>true</value>
</entry>
<entry>
<key>Input Directory Location</key>
<value>Local</value>
</entry>
<entry>
<key>File Filter</key>
<value>[^\.].*</value>
</entry>
<entry>
<key>Path Filter</key>
</entry>
<entry>
<key>Minimum File Age</key>
<value>0 sec</value>
</entry>
<entry>
<key>Maximum File Age</key>
</entry>
<entry>
<key>Minimum File Size</key>
<value>0 B</value>
</entry>
<entry>
<key>Maximum File Size</key>
</entry>
<entry>
<key>Ignore Hidden Files</key>
<value>true</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1d</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>ListFile</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.ListFile</type>
</processors>
<processors>
<id>49dcbc11-f4e5-386b-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<position>
<x>1232.5487017764017</x>
<y>116.24006775205794</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>filesize</key>
<value>
<name>filesize</name>
</value>
</entry>
<entry>
<key>schema.name</key>
<value>
<name>schema.name</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>filesize</key>
<value>${file.size}</value>
</entry>
<entry>
<key>schema.name</key>
<value>files</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>UpdateAttribute</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>5b193af6-c04b-3d62-0000-000000000000</id>
<parentGroupId>037b7341-8a8c-3047-0000-000000000000</parentGroupId>
<position>
<x>1232.5487017764017</x>
<y>550.7273144561594</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Record Reader</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordReaderFactory</identifiesControllerService>
<name>Record Reader</name>
</value>
</entry>
<entry>
<key>Record Writer</key>
<value>
<identifiesControllerService>org.apache.nifi.serialization.RecordSetWriterFactory</identifiesControllerService>
<name>Record Writer</name>
</value>
</entry>
<entry>
<key>Records Per Split</key>
<value>
<name>Records Per Split</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Record Reader</key>
<value>196a6041-7aba-354d-0000-000000000000</value>
</entry>
<entry>
<key>Record Writer</key>
<value>cc843758-c4cc-3a27-0000-000000000000</value>
</entry>
<entry>
<key>Records Per Split</key>
<value>1</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Split Files around 10MB chunk</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>true</autoTerminate>
<name>original</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>splits</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.SplitRecord</type>
</processors>
</contents>
<name>DistributeByFileSize</name>
</processGroups>
</snippet>
<timestamp>06/03/2017 13:27:53 JST</timestamp>
</template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment