Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

NiFi Example Remove Cache by InvokeScriptedProcessor

When there's no existing processor that does what you want, you can create one with InvokeScriptedProcessor.

import org.apache.commons.lang3.StringUtils
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.exception.SerializationException
import org.apache.nifi.expression.AttributeExpression
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.processor.util.StandardValidators
import org.apache.nifi.script.ScriptingComponentUtils

import java.nio.charset.StandardCharsets

class RemoveCache extends AbstractProcessor {

    public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
            .name("Distributed Cache Service")
            .description("The Controller Service that is used to get the cached values.")
            .required(true)
            .identifiesControllerService(DistributedMapCacheClient.class)
            .build()

    public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
            .name("Cache Entry Identifier")
            .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
            + "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
            .required(true)
            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
            .defaultValue('${hash.value}')
            .expressionLanguageSupported(true)
            .build()

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("If the cache was successfully communicated with it will be routed to this relationship")
            .build()
    public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
            .name("not-found")
            .description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship")
            .build()
    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("If unable to communicate with the cache or if the cache entry is evaluated to be blank, the FlowFile will be penalized and routed to this relationship")
            .build()
    private final Set<Relationship> relationships

    private final Serializer<String> keySerializer = new Serializer<String>() {
        @Override
        void serialize(String value, OutputStream output) throws SerializationException, IOException {
            output.write(value.getBytes(StandardCharsets.UTF_8))
        }
    }

    RemoveCache() {
        final Set<Relationship> rels = new HashSet<>()
        rels.add(REL_SUCCESS)
        rels.add(REL_FAILURE)
        relationships = Collections.unmodifiableSet(rels)
    }

    List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        final List<PropertyDescriptor> descriptors = new ArrayList<>()
        descriptors.add(ScriptingComponentUtils.SCRIPT_FILE)
        descriptors.add(ScriptingComponentUtils.SCRIPT_BODY)
        descriptors.add(ScriptingComponentUtils.MODULES)
        descriptors.add(PROP_CACHE_ENTRY_IDENTIFIER)
        descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE)
        return descriptors
    }

    Set<Relationship> getRelationships() {
        return relationships
    }

    void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get()
        if (flowFile == null) {
            return
        }

        final ComponentLog logger = getLogger()
        final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue()
        if (StringUtils.isBlank(cacheKey)) {
            logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", [flowFile])
            flowFile = session.penalize(flowFile)
            session.transfer(flowFile, REL_FAILURE)
            return
        }

        final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class)
        try {
            cache.remove(cacheKey, keySerializer)
            session.transfer(flowFile, REL_SUCCESS)
        } catch (final IOException e) {
            flowFile = session.penalize(flowFile)
            session.transfer(flowFile, REL_FAILURE)
            logger.error("Unable to communicate with cache when processing {} due to {}", [flowFile, e])
        }
    }



}

processor = new RemoveCache()
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>0cae3b5d-015d-1000-b9ed-64b180b79ed8</groupId>
<name>RemoveCacheExample</name>
<snippet>
<processGroups>
<id>35fe8898-bd57-3b85-0000-000000000000</id>
<parentGroupId>6018e1bb-e364-3d31-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>a1d250a9-c861-3536-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>681.4798583984375</x>
<y>594.5999145507812</y>
</bends>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>duplicate</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>dd14aead-3c13-3555-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>40f5216d-1f3d-3d88-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>121f6822-4671-304c-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>54b9de9c-e795-32a8-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>23e41de5-2cff-3fa3-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>66a203c2-9a9c-3c2f-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>3803fb76-ddad-3b91-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>38b486ca-bda7-3dfc-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>906.119873046875</x>
<y>593.159912109375</y>
</bends>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>3adf64fc-0e2e-3847-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>3803fb76-ddad-3b91-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>non-duplicate</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>3aefc250-d1a2-3716-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1100.519775390625</x>
<y>679.5599365234375</y>
</bends>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>66a203c2-9a9c-3c2f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>43b75db5-8d01-3f12-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1096.1998291015625</x>
<y>760.1998901367188</y>
</bends>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>66a203c2-9a9c-3c2f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>bd6674a6-3b20-3cdb-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-distributed-cache-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Server Hostname</key>
<value>
<name>Server Hostname</name>
</value>
</entry>
<entry>
<key>Server Port</key>
<value>
<name>Server Port</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>
<name>Communications Timeout</name>
</value>
</entry>
</descriptors>
<name>DistributedMapCacheClientService</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Server Hostname</key>
<value>localhost</value>
</entry>
<entry>
<key>Server Port</key>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Communications Timeout</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</type>
</controllerServices>
<processors>
<id>996e015f-216b-3066-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>592.1999793111697</x>
<y>422.0600709829333</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>FlowFile Description</key>
<value>
<name>FlowFile Description</name>
</value>
</entry>
<entry>
<key>Age Off Duration</key>
<value>
<name>Age Off Duration</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Cache The Entry Identifier</key>
<value>
<name>Cache The Entry Identifier</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>${data.id}</value>
</entry>
<entry>
<key>FlowFile Description</key>
<value>duplicate</value>
</entry>
<entry>
<key>Age Off Duration</key>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>bd6674a6-3b20-3cdb-0000-000000000000</value>
</entry>
<entry>
<key>Cache The Entry Identifier</key>
<value>true</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>DetectDuplicate</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>duplicate</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>non-duplicate</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.DetectDuplicate</type>
</processors>
<processors>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>593.6399817525762</x>
<y>657.9600044546129</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>3803fb76-ddad-3b91-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>1244.0598377096073</x>
<y>422.0600709829333</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>data.processedAt</key>
<value>
<name>data.processedAt</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>data.processedAt</key>
<value>${now()}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Do something</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>40f5216d-1f3d-3d88-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>825.4399158346075</x>
<y>183.09997515773796</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>data.id</key>
<value>
<name>data.id</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>2</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>${now()}</value>
</entry>
<entry>
<key>data.id</key>
<value>B</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Generate 2 Bs per sec</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>54b9de9c-e795-32a8-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>373.03995245570127</x>
<y>183.09997515773796</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>data.id</key>
<value>
<name>data.id</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>2</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>${now()}</value>
</entry>
<entry>
<key>data.id</key>
<value>A</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Generate 2 As per sec</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>66a203c2-9a9c-3c2f-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>1244.0598377096073</x>
<y>657.2400715932847</y>
</position>
<bundle>
<artifact>nifi-scripting-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<value>import org.apache.commons.lang3.StringUtils
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.exception.SerializationException
import org.apache.nifi.expression.AttributeExpression
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.processor.util.StandardValidators
import org.apache.nifi.script.ScriptingComponentUtils
import java.nio.charset.StandardCharsets
class RemoveCache extends AbstractProcessor {
public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to get the cached values.")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.build()
public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache Entry Identifier")
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
+ "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.defaultValue('${hash.value}')
.expressionLanguageSupported(true)
.build()
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("If the cache was successfully communicated with it will be routed to this relationship")
.build()
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not-found")
.description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship")
.build()
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If unable to communicate with the cache or if the cache entry is evaluated to be blank, the FlowFile will be penalized and routed to this relationship")
.build()
private final Set&lt;Relationship&gt; relationships
private final Serializer&lt;String&gt; keySerializer = new Serializer&lt;String&gt;() {
@Override
void serialize(String value, OutputStream output) throws SerializationException, IOException {
output.write(value.getBytes(StandardCharsets.UTF_8))
}
}
RemoveCache() {
final Set&lt;Relationship&gt; rels = new HashSet&lt;&gt;()
rels.add(REL_SUCCESS)
rels.add(REL_FAILURE)
relationships = Collections.unmodifiableSet(rels)
}
List&lt;PropertyDescriptor&gt; getSupportedPropertyDescriptors() {
final List&lt;PropertyDescriptor&gt; descriptors = new ArrayList&lt;&gt;()
descriptors.add(ScriptingComponentUtils.SCRIPT_FILE)
descriptors.add(ScriptingComponentUtils.SCRIPT_BODY)
descriptors.add(ScriptingComponentUtils.MODULES)
descriptors.add(PROP_CACHE_ENTRY_IDENTIFIER)
descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE)
return descriptors
}
Set&lt;Relationship&gt; getRelationships() {
return relationships
}
void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get()
if (flowFile == null) {
return
}
final ComponentLog logger = getLogger()
final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue()
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", [flowFile])
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
return
}
final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class)
try {
cache.remove(cacheKey, keySerializer)
session.transfer(flowFile, REL_SUCCESS)
} catch (final IOException e) {
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
logger.error("Unable to communicate with cache when processing {} due to {}", [flowFile, e])
}
}
}
processor = new RemoveCache()</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
<entry>
<key>Cache Entry Identifier</key>
<value>${data.id}</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>bd6674a6-3b20-3cdb-0000-000000000000</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>RemoveCache</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type>
</processors>
</contents>
<name>RemoveCacheExample</name>
</processGroups>
</snippet>
<timestamp>07/10/2017 10:35:41 JST</timestamp>
</template>
@behrouz-s

This comment has been minimized.

Copy link

commented May 1, 2019

it doesn't work!
this error is shown:
"Validation is invalid because an error occurred calling validate in the configured script processor"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.