Last active
December 26, 2015 14:19
-
-
Save teaforthecat/c9fe0236c5dac2e3045b to your computer and use it in GitHub Desktop.
Consume a kafka message stream of filtered keys
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(def consumer-registry (atom {})) | |
(defn get-or-create-message-stream [topic] | |
(if-let [message-stream (get @consumer-registry topic)] | |
message-stream | |
(let [[cnsmr messages] (kafka/open-consumer "consumer-registry" topic) | |
new-message-stream (ms/->source messages)] | |
;; create lifespan for consumer of 1 minute, should probably be a component | |
(a/go (a/<! (a/timeout 60e3 )) | |
(kafka/shutdown cnsmr) | |
(swap! consumer-registry dissoc topic) ) | |
;; register | |
(swap! consumer-registry assoc topic new-message-stream) | |
;; return lazy-seq of messages | |
new-message-stream))) | |
(defn personal-subscriber [topic user-key] | |
(let [message-stream (get-or-create-message-stream topic)] | |
(ms/filter | |
#(if-let [kafka-key (:key %)] | |
(= user-key (bs/convert kafka-key String))) | |
message-stream))) | |
(comment ;; example | |
(def personal-messages (personal-subscriber "bones.jobs-test..wat-output" "user-123")) | |
(kafka/produce "bones.jobs-test..wat-output" "user-456" "hello123") | |
(kafka/produce "bones.jobs-test..wat-output" "user-123" "hello123") | |
(kafka/produce "bones.jobs-test..wat-output" "user-789" "hello123") | |
(ms/consume println personal-messages) | |
;; offset of 1, not 0 or 2 | |
;; => #clj_kafka.core.KafkaMessage{:topic bones.jobs-test..wat-output, :offset 1, ... | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hold on, this doesn't work. Since the iterator returned from clj-kafka is not having
.next
called on it we are only seeing the first message that the consumer sees. We could use adoseq
andcore.async
combo but let's sideline this approach for now. I'm concerned about missed messages when a user-id is skipped because it doesn't equal thekafka-key
, but the consumer's offset is updated, so that message will be lost.sync-output
), and we'd need toack
the message id on the peer before any other peer does - ouch.Let's come back to the sync response later. Let's see how weird it is to consume an event stream in the browser (SSE) and respond to the user in that way. This will still be requiring one kafka consumer thread per user, but at least it isn't every request. (Perhaps there is a possibility of using redis instead of zookeeper later on with kafka-fast ?)