Skip to content

Instantly share code, notes, and snippets.

@ijokarumawak
Last active February 15, 2022 11:06
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ijokarumawak/14d560fec5a052b3a157b38a11955772 to your computer and use it in GitHub Desktop.
Save ijokarumawak/14d560fec5a052b3a157b38a11955772 to your computer and use it in GitHub Desktop.

NiFi Example Remove Cache by InvokeScriptedProcessor

When there's no existing processor that does what you want, you can create one with InvokeScriptedProcessor.

import org.apache.commons.lang3.StringUtils
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.exception.SerializationException
import org.apache.nifi.expression.AttributeExpression
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.processor.util.StandardValidators

import java.nio.charset.StandardCharsets

class RemoveCache extends AbstractProcessor {

    public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
            .name("Distributed Cache Service")
            .description("The Controller Service that is used to get the cached values.")
            .required(true)
            .identifiesControllerService(DistributedMapCacheClient.class)
            .build()

    public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
            .name("Cache Entry Identifier")
            .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
            + "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
            .required(true)
            .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
            .defaultValue('${hash.value}')
            .expressionLanguageSupported(true)
            .build()

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("If the cache was successfully communicated with it will be routed to this relationship")
            .build()
    public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
            .name("not-found")
            .description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship")
            .build()
    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("If unable to communicate with the cache or if the cache entry is evaluated to be blank, the FlowFile will be penalized and routed to this relationship")
            .build()
    private final Set<Relationship> relationships

    private final Serializer<String> keySerializer = new Serializer<String>() {
        @Override
        void serialize(String value, OutputStream output) throws SerializationException, IOException {
            output.write(value.getBytes(StandardCharsets.UTF_8))
        }
    }

    RemoveCache() {
        final Set<Relationship> rels = new HashSet<>()
        rels.add(REL_SUCCESS)
        rels.add(REL_FAILURE)
        relationships = Collections.unmodifiableSet(rels)
    }

    List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        final List<PropertyDescriptor> descriptors = new ArrayList<>()
        descriptors.add(PROP_CACHE_ENTRY_IDENTIFIER)
        descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE)
        return descriptors
    }

    Set<Relationship> getRelationships() {
        return relationships
    }

    void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get()
        if (flowFile == null) {
            return
        }

        final ComponentLog logger = getLogger()
        final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue()
        if (StringUtils.isBlank(cacheKey)) {
            logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", [flowFile])
            flowFile = session.penalize(flowFile)
            session.transfer(flowFile, REL_FAILURE)
            return
        }

        final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class)
        try {
            cache.remove(cacheKey, keySerializer)
            session.transfer(flowFile, REL_SUCCESS)
        } catch (final IOException e) {
            flowFile = session.penalize(flowFile)
            session.transfer(flowFile, REL_FAILURE)
            logger.error("Unable to communicate with cache when processing {} due to {}", [flowFile, e])
        }
    }



}

processor = new RemoveCache()
<?xml version="1.0" ?>
<template encoding-version="1.1">
<description></description>
<groupId>0cae3b5d-015d-1000-b9ed-64b180b79ed8</groupId>
<name>RemoveCacheExample</name>
<snippet>
<processGroups>
<id>35fe8898-bd57-3b85-0000-000000000000</id>
<parentGroupId>6018e1bb-e364-3d31-0000-000000000000</parentGroupId>
<position>
<x>0.0</x>
<y>0.0</y>
</position>
<comments></comments>
<contents>
<connections>
<id>a1d250a9-c861-3536-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>681.4798583984375</x>
<y>594.5999145507812</y>
</bends>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>duplicate</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>dd14aead-3c13-3555-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>40f5216d-1f3d-3d88-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>121f6822-4671-304c-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>54b9de9c-e795-32a8-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>23e41de5-2cff-3fa3-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>66a203c2-9a9c-3c2f-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>3803fb76-ddad-3b91-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>38b486ca-bda7-3dfc-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>906.119873046875</x>
<y>593.159912109375</y>
</bends>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>3adf64fc-0e2e-3847-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>3803fb76-ddad-3b91-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>non-duplicate</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>996e015f-216b-3066-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>3aefc250-d1a2-3716-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1100.519775390625</x>
<y>679.5599365234375</y>
</bends>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>0</labelIndex>
<name></name>
<selectedRelationships>success</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>66a203c2-9a9c-3c2f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<connections>
<id>43b75db5-8d01-3f12-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold>
<backPressureObjectThreshold>10000</backPressureObjectThreshold>
<bends>
<x>1096.1998291015625</x>
<y>760.1998901367188</y>
</bends>
<destination>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<type>PROCESSOR</type>
</destination>
<flowFileExpiration>0 sec</flowFileExpiration>
<labelIndex>1</labelIndex>
<name></name>
<selectedRelationships>failure</selectedRelationships>
<source>
<groupId>35fe8898-bd57-3b85-0000-000000000000</groupId>
<id>66a203c2-9a9c-3c2f-0000-000000000000</id>
<type>PROCESSOR</type>
</source>
<zIndex>0</zIndex>
</connections>
<controllerServices>
<id>bd6674a6-3b20-3cdb-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<bundle>
<artifact>nifi-distributed-cache-services-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<comments></comments>
<descriptors>
<entry>
<key>Server Hostname</key>
<value>
<name>Server Hostname</name>
</value>
</entry>
<entry>
<key>Server Port</key>
<value>
<name>Server Port</name>
</value>
</entry>
<entry>
<key>SSL Context Service</key>
<value>
<identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService>
<name>SSL Context Service</name>
</value>
</entry>
<entry>
<key>Communications Timeout</key>
<value>
<name>Communications Timeout</name>
</value>
</entry>
</descriptors>
<name>DistributedMapCacheClientService</name>
<persistsState>false</persistsState>
<properties>
<entry>
<key>Server Hostname</key>
<value>localhost</value>
</entry>
<entry>
<key>Server Port</key>
</entry>
<entry>
<key>SSL Context Service</key>
</entry>
<entry>
<key>Communications Timeout</key>
</entry>
</properties>
<state>ENABLED</state>
<type>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</type>
</controllerServices>
<processors>
<id>996e015f-216b-3066-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>592.1999793111697</x>
<y>422.0600709829333</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>FlowFile Description</key>
<value>
<name>FlowFile Description</name>
</value>
</entry>
<entry>
<key>Age Off Duration</key>
<value>
<name>Age Off Duration</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
<entry>
<key>Cache The Entry Identifier</key>
<value>
<name>Cache The Entry Identifier</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Cache Entry Identifier</key>
<value>${data.id}</value>
</entry>
<entry>
<key>FlowFile Description</key>
<value>duplicate</value>
</entry>
<entry>
<key>Age Off Duration</key>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>bd6674a6-3b20-3cdb-0000-000000000000</value>
</entry>
<entry>
<key>Cache The Entry Identifier</key>
<value>true</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>DetectDuplicate</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>duplicate</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>non-duplicate</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.DetectDuplicate</type>
</processors>
<processors>
<id>b487b652-6262-3fe0-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>593.6399817525762</x>
<y>657.9600044546129</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Log Level</key>
<value>
<name>Log Level</name>
</value>
</entry>
<entry>
<key>Log Payload</key>
<value>
<name>Log Payload</name>
</value>
</entry>
<entry>
<key>Attributes to Log</key>
<value>
<name>Attributes to Log</name>
</value>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>
<name>attributes-to-log-regex</name>
</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
<value>
<name>Attributes to Ignore</name>
</value>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
<value>
<name>attributes-to-ignore-regex</name>
</value>
</entry>
<entry>
<key>Log prefix</key>
<value>
<name>Log prefix</name>
</value>
</entry>
<entry>
<key>character-set</key>
<value>
<name>character-set</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Log Level</key>
<value>info</value>
</entry>
<entry>
<key>Log Payload</key>
<value>false</value>
</entry>
<entry>
<key>Attributes to Log</key>
</entry>
<entry>
<key>attributes-to-log-regex</key>
<value>.*</value>
</entry>
<entry>
<key>Attributes to Ignore</key>
</entry>
<entry>
<key>attributes-to-ignore-regex</key>
</entry>
<entry>
<key>Log prefix</key>
</entry>
<entry>
<key>character-set</key>
<value>UTF-8</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>LogAttribute</name>
<relationships>
<autoTerminate>true</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.LogAttribute</type>
</processors>
<processors>
<id>3803fb76-ddad-3b91-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>1244.0598377096073</x>
<y>422.0600709829333</y>
</position>
<bundle>
<artifact>nifi-update-attribute-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Delete Attributes Expression</key>
<value>
<name>Delete Attributes Expression</name>
</value>
</entry>
<entry>
<key>Store State</key>
<value>
<name>Store State</name>
</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
<value>
<name>Stateful Variables Initial Value</name>
</value>
</entry>
<entry>
<key>data.processedAt</key>
<value>
<name>data.processedAt</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Delete Attributes Expression</key>
</entry>
<entry>
<key>Store State</key>
<value>Do not store state</value>
</entry>
<entry>
<key>Stateful Variables Initial Value</key>
</entry>
<entry>
<key>data.processedAt</key>
<value>${now()}</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Do something</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
</processors>
<processors>
<id>40f5216d-1f3d-3d88-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>825.4399158346075</x>
<y>183.09997515773796</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>data.id</key>
<value>
<name>data.id</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>2</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>${now()}</value>
</entry>
<entry>
<key>data.id</key>
<value>B</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Generate 2 Bs per sec</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>54b9de9c-e795-32a8-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>373.03995245570127</x>
<y>183.09997515773796</y>
</position>
<bundle>
<artifact>nifi-standard-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>File Size</key>
<value>
<name>File Size</name>
</value>
</entry>
<entry>
<key>Batch Size</key>
<value>
<name>Batch Size</name>
</value>
</entry>
<entry>
<key>Data Format</key>
<value>
<name>Data Format</name>
</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>
<name>Unique FlowFiles</name>
</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>
<name>generate-ff-custom-text</name>
</value>
</entry>
<entry>
<key>data.id</key>
<value>
<name>data.id</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>File Size</key>
<value>0B</value>
</entry>
<entry>
<key>Batch Size</key>
<value>2</value>
</entry>
<entry>
<key>Data Format</key>
<value>Text</value>
</entry>
<entry>
<key>Unique FlowFiles</key>
<value>false</value>
</entry>
<entry>
<key>generate-ff-custom-text</key>
<value>${now()}</value>
</entry>
<entry>
<key>data.id</key>
<value>A</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>1 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>Generate 2 As per sec</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.standard.GenerateFlowFile</type>
</processors>
<processors>
<id>66a203c2-9a9c-3c2f-0000-000000000000</id>
<parentGroupId>35fe8898-bd57-3b85-0000-000000000000</parentGroupId>
<position>
<x>1244.0598377096073</x>
<y>657.2400715932847</y>
</position>
<bundle>
<artifact>nifi-scripting-nar</artifact>
<group>org.apache.nifi</group>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<config>
<bulletinLevel>WARN</bulletinLevel>
<comments></comments>
<concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
<descriptors>
<entry>
<key>Script Engine</key>
<value>
<name>Script Engine</name>
</value>
</entry>
<entry>
<key>Script File</key>
<value>
<name>Script File</name>
</value>
</entry>
<entry>
<key>Script Body</key>
<value>
<name>Script Body</name>
</value>
</entry>
<entry>
<key>Module Directory</key>
<value>
<name>Module Directory</name>
</value>
</entry>
<entry>
<key>Cache Entry Identifier</key>
<value>
<name>Cache Entry Identifier</name>
</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>
<identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService>
<name>Distributed Cache Service</name>
</value>
</entry>
</descriptors>
<executionNode>ALL</executionNode>
<lossTolerant>false</lossTolerant>
<penaltyDuration>30 sec</penaltyDuration>
<properties>
<entry>
<key>Script Engine</key>
<value>Groovy</value>
</entry>
<entry>
<key>Script File</key>
</entry>
<entry>
<key>Script Body</key>
<value>import org.apache.commons.lang3.StringUtils
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.exception.SerializationException
import org.apache.nifi.expression.AttributeExpression
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.processor.util.StandardValidators
import java.nio.charset.StandardCharsets
class RemoveCache extends AbstractProcessor {
public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to get the cached values.")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.build()
public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache Entry Identifier")
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
+ "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.defaultValue('${hash.value}')
.expressionLanguageSupported(true)
.build()
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("If the cache was successfully communicated with it will be routed to this relationship")
.build()
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not-found")
.description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship")
.build()
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If unable to communicate with the cache or if the cache entry is evaluated to be blank, the FlowFile will be penalized and routed to this relationship")
.build()
private final Set&lt;Relationship&gt; relationships
private final Serializer&lt;String&gt; keySerializer = new Serializer&lt;String&gt;() {
@Override
void serialize(String value, OutputStream output) throws SerializationException, IOException {
output.write(value.getBytes(StandardCharsets.UTF_8))
}
}
RemoveCache() {
final Set&lt;Relationship&gt; rels = new HashSet&lt;&gt;()
rels.add(REL_SUCCESS)
rels.add(REL_FAILURE)
relationships = Collections.unmodifiableSet(rels)
}
List&lt;PropertyDescriptor&gt; getSupportedPropertyDescriptors() {
final List&lt;PropertyDescriptor&gt; descriptors = new ArrayList&lt;&gt;()
descriptors.add(PROP_CACHE_ENTRY_IDENTIFIER)
descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE)
return descriptors
}
Set&lt;Relationship&gt; getRelationships() {
return relationships
}
void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get()
if (flowFile == null) {
return
}
final ComponentLog logger = getLogger()
final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue()
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", [flowFile])
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
return
}
final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class)
try {
cache.remove(cacheKey, keySerializer)
session.transfer(flowFile, REL_SUCCESS)
} catch (final IOException e) {
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
logger.error("Unable to communicate with cache when processing {} due to {}", [flowFile, e])
}
}
}
processor = new RemoveCache()</value>
</entry>
<entry>
<key>Module Directory</key>
</entry>
<entry>
<key>Cache Entry Identifier</key>
<value>${data.id}</value>
</entry>
<entry>
<key>Distributed Cache Service</key>
<value>bd6674a6-3b20-3cdb-0000-000000000000</value>
</entry>
</properties>
<runDurationMillis>0</runDurationMillis>
<schedulingPeriod>0 sec</schedulingPeriod>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<yieldDuration>1 sec</yieldDuration>
</config>
<name>RemoveCache</name>
<relationships>
<autoTerminate>false</autoTerminate>
<name>failure</name>
</relationships>
<relationships>
<autoTerminate>false</autoTerminate>
<name>success</name>
</relationships>
<style></style>
<type>org.apache.nifi.processors.script.InvokeScriptedProcessor</type>
</processors>
</contents>
<name>RemoveCacheExample</name>
</processGroups>
</snippet>
<timestamp>07/10/2017 10:35:41 JST</timestamp>
</template>
@behrouz-s
Copy link

it doesn't work!
this error is shown:
"Validation is invalid because an error occurred calling validate in the configured script processor"

@antonioparry
Copy link

It works in NiFi 1.8.0 but it is not working in NiFi 1.11.3, could be a problem with NiFi itself executing the groovy script?

@ijokarumawak
Copy link
Author

Hi @behrouz-s @antonioparry, thanks for flagging this. I've updated the template and tested with the latest NiFi branch (1.12.0-SNAPSHOT). It should work now.

@bfaubert
Copy link

I had to make a couple of modifications in order to get the not-found relationship working. I added rels.add(REL_NOT_FOUND) to the rels HashSet and check the return value when removing from cache in order to route to the proper relationship depending on whether or not the cache key was found.

if (cache.remove(cacheKey, keySerializer)) {
    session.transfer(flowFile, REL_SUCCESS)    
}
else {
    session.transfer(flowFile, REL_NOT_FOUND)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment