Skip to content

Instantly share code, notes, and snippets.

package com.pardot.storm.grouping;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.task.WorkerTopologyContext;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
(^void execute [this ^Tuple tuple]
(let [^RotatingMap pending (.getObject pending)
stream-id (.getSourceStreamId tuple)]
(if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
; If stream is is system tick, we rotate
(.rotate pending)
; Otherwise we let id equal value 0 from the tuple
(let [id (.getValue tuple 0)
; grab our output collector
^OutputCollector output-collector (.getObject output-collector)