Skip to content

Instantly share code, notes, and snippets.

@teaforthecat
Last active December 26, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save teaforthecat/c9fe0236c5dac2e3045b to your computer and use it in GitHub Desktop.
Save teaforthecat/c9fe0236c5dac2e3045b to your computer and use it in GitHub Desktop.
Consume a kafka message stream of filtered keys
(def consumer-registry (atom {}))
(defn get-or-create-message-stream [topic]
(if-let [message-stream (get @consumer-registry topic)]
message-stream
(let [[cnsmr messages] (kafka/open-consumer "consumer-registry" topic)
new-message-stream (ms/->source messages)]
;; create lifespan for consumer of 1 minute, should probably be a component
(a/go (a/<! (a/timeout 60e3 ))
(kafka/shutdown cnsmr)
(swap! consumer-registry dissoc topic) )
;; register
(swap! consumer-registry assoc topic new-message-stream)
;; return lazy-seq of messages
new-message-stream)))
(defn personal-subscriber [topic user-key]
(let [message-stream (get-or-create-message-stream topic)]
(ms/filter
#(if-let [kafka-key (:key %)]
(= user-key (bs/convert kafka-key String)))
message-stream)))
(comment ;; example
(def personal-messages (personal-subscriber "bones.jobs-test..wat-output" "user-123"))
(kafka/produce "bones.jobs-test..wat-output" "user-456" "hello123")
(kafka/produce "bones.jobs-test..wat-output" "user-123" "hello123")
(kafka/produce "bones.jobs-test..wat-output" "user-789" "hello123")
(ms/consume println personal-messages)
;; offset of 1, not 0 or 2
;; => #clj_kafka.core.KafkaMessage{:topic bones.jobs-test..wat-output, :offset 1, ...
)
@teaforthecat
Copy link
Author

a note on the libraries used:

(require ' [clojure.core.async :as a])
(require '[manifold.stream :as ms])
(require '[byte-streams :as bs])

@teaforthecat
Copy link
Author

Hold on, this doesn't work. Since the iterator returned from clj-kafka is not having .next called on it we are only seeing the first message that the consumer sees. We could use a doseq and core.async combo but let's sideline this approach for now. I'm concerned about missed messages when a user-id is skipped because it doesn't equal the kafka-key, but the consumer's offset is updated, so that message will be lost.

  • An alternative would be one consumer thread per user, with filtering. That might not be scalable though, with the limited supply of threads and the reliance of consumer registry on zookeeper (clustered filesystem ops on each request?-no) .
  • Another approach could be a special onyx peer, whose only task is to receive the output on a core.async channel. That would be ideal really because it would be the most direct path for the data, it would be fast and secure.This peer would be running on the web server process and would need to be protected from doing background work. This would be very complicated though, I believe, if even possible. We would need a special task whitelist for this peer( perhaps something like: anything ending in sync-output), and we'd need to ack the message id on the peer before any other peer does - ouch.

Let's come back to the sync response later. Let's see how weird it is to consume an event stream in the browser (SSE) and respond to the user in that way. This will still be requiring one kafka consumer thread per user, but at least it isn't every request. (Perhaps there is a possibility of using redis instead of zookeeper later on with kafka-fast ?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment