Skip to content

Instantly share code, notes, and snippets.

@rboyd
Created March 26, 2014 16:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rboyd/9786887 to your computer and use it in GitHub Desktop.
Save rboyd/9786887 to your computer and use it in GitHub Desktop.
dedup using first unique window with core.async
(ns cep-dedup
(:require [clj-time.coerce :as c]
[clojure.core.async :refer :all]))
; something like esper's std:firstunique. for a given time window ignore any duplicate messages received
; on channels after the first unique. subsequent messages on the first receiving channel pass through.
(defn update-sources [sources msgs min-time]
(clojure.core/reduce (fn [s msg]
(let [remaining (set (filter (fn [m] (not= (:ts m) min-time)) (get s (hash msg))))]
(if (empty? remaining) (dissoc s (hash msg))
(assoc s (hash msg) remaining)))) sources msgs))
(defn add-msg [sources buffer ch msg]
(let [ts (java.util.Date.)]
[(merge-with #(clojure.set/union %1 %2) sources {(hash msg) #{{:source ch :ts ts}}})
(merge-with #(clojure.set/union %1 %2) buffer {ts #{{:body msg}}})]))
(defn dedup [recv-chs out-ch window-timespan]
(thread (loop [sources {} ; hash(message) => #{{:source <login> :ts <#inst>}, ...}
buffer (sorted-map)] ; <#inst (timestamp)> => #{full-msg1, ...}
(let [min-time (first (keys buffer))
timeout-ch (if min-time (timeout (- (+ window-timespan (c/to-long min-time)) (c/to-long (java.util.Date.)))) nil)
[v ch] (alts!! (if timeout-ch (conj recv-chs timeout-ch) recv-chs))
[new-sources new-buffer] (if (= ch timeout-ch) ; handle timeout
(let [msgs (get buffer min-time)]
(doseq [msg msgs]
(go (>! out-ch (:body msg)))) ; enqueue all messages with this timestamp
[(update-sources sources (clojure.core/map :body msgs) min-time) ; remove from sources where hash(msg body) and :ts match
(dissoc buffer min-time)]) ; remove messages from buffer where k matches min-time
(if-let [existing (get sources (hash v))] ; existing set #{ {:src <ch> :ts <inst>} }
(if (some #(= (:source %) ch) existing)
(add-msg sources buffer ch v) ; same body, same source: add to buffer
[sources buffer]) ; same body, different source: ignore
(add-msg sources buffer ch v)))] ; no existing msg: add to buffer
(recur new-sources new-buffer)))))
(comment
(def sch1 (chan))
(def sch2 (chan))
(def och1 (chan))
(dedup [sch1 sch2] och1 500)
(thread (while true
(println "received" (<!! och1))))
(>!! sch1 {:msg :data})
(>!! sch2 {:msg :data})
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment