Skip to content

Instantly share code, notes, and snippets.

@tristanstraub
Last active January 8, 2019 20:25
Show Gist options
  • Save tristanstraub/e5f50698d04aa3cc3bac6baf71943cae to your computer and use it in GitHub Desktop.
Save tristanstraub/e5f50698d04aa3cc3bac6baf71943cae to your computer and use it in GitHub Desktop.
kafka
(ns watermarks)
(defn partition-of
[partitions key]
(mod (.hashCode "test") partitions))
(def partitions 12)
(defn produce!
[broker topic key message]
(swap! broker update-in
[:topics topic :data (partition-of partitions
key)]
(fnil conj []) message))
(defn consume!
[broker consumer topic partition]
(let [offset (get-in @broker [:consumers consumer topic partition] 0)
message (get-in @broker [:topics topic :data partition offset])]
(when message
(swap! broker update-in
[:consumers consumer topic partition] (fnil inc 0)))
message))
(comment
:coordinator/received-all-partitions-of-frame "To each active key of inbound"
:simulator/key-sent-all-outbound "To all active key of outbound"
)
(defn simulate!
[broker simulator]
(->> (doall (map #(consume! broker "simulator" "objects" %) (range partitions)))
(group-by :timestamp)
))
(defn coordinate!
[broker coordinator]
)
(let [broker (atom nil)
simulator (atom {:topic "objects"})
coordinator (atom nil)]
(produce! broker "test" "one" "two")
(produce! broker "test" "one" "three")
(consume! broker "c" "test" 10)
(consume! broker "c" "test" 10))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment