Skip to content

Instantly share code, notes, and snippets.

@Crim
Created November 8, 2015 04:23
Show Gist options
  • Save Crim/8ba12b695b4a0c16e8e6 to your computer and use it in GitHub Desktop.
Save Crim/8ba12b695b4a0c16e8e6 to your computer and use it in GitHub Desktop.
(^void execute [this ^Tuple tuple]
(let [^RotatingMap pending (.getObject pending)
stream-id (.getSourceStreamId tuple)]
(if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
; If stream is is system tick, we rotate
(.rotate pending)
; Otherwise we let id equal value 0 from the tuple
(let [id (.getValue tuple 0)
; grab our output collector
^OutputCollector output-collector (.getObject output-collector)
; grab our current value from rotating map using the tuple id
curr (.get pending id)
; based on what stream-id the tuple came from
curr (condp = stream-id
; If init stream (new tuple)
ACKER-INIT-STREAM-ID (-> curr
(update-ack (.getValue tuple 1))
(assoc :spout-task (.getValue tuple 2)))
; Ack stream
; Update current value xoring with the old value
ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
; Fail stream, set failde key to true
ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
; put the value back into our rotating map
; this should extend its timeout
(.put pending id curr)
(when (and curr (:spout-task curr))
; if our current value has been xor'd to 0
(cond (= 0 (:val curr))
; tell the spout the tuple is complete
(do
(.remove pending id)
(acker-emit-direct output-collector
(:spout-task curr)
ACKER-ACK-STREAM-ID
[id]
))
; else If the failed key is set
; we fail the tuple
(:failed curr)
(do
(.remove pending id)
(acker-emit-direct output-collector
(:spout-task curr)
ACKER-FAIL-STREAM-ID
[id]
))
))
; Lastly ack the tuple
(.ack output-collector tuple)
))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment