Skip to content

Instantly share code, notes, and snippets.

@kapilreddy
Last active April 12, 2017 11:32
Show Gist options
  • Save kapilreddy/a25c3efe37c04283e22b to your computer and use it in GitHub Desktop.
Save kapilreddy/a25c3efe37c04283e22b to your computer and use it in GitHub Desktop.
Kafka message offset finder
;; project.clj
;; [clj-time "0.6.0"]
;; [org.clojure/data.json "0.2.4"]
;; [clj-kafka "0.2.8-0.8.1.1"]
;; Utility to find offsets in a given Kafka topic for a given
;; cursor/point in time. The code assumes that each message has a
;; monotonically increasing number (ex. unix timestamp) associated with
;; it.
(ns find-offset
(:require [clj-kafka.consumer.simple :refer [consumer
messages
topic-offset
topic-meta-data]]
[clj-time.core :as ct]
[clojure.data.json :refer [read-str]])
(:import [org.joda.time DateTime]))
(defn find-msg-at-offset
[compare-pred topic partition-id {:keys [host port]} & {:keys [max-jumps fetch-size]
:or {max-jumps 10
fetch-size 10000}}]
(let [c (consumer host port "offset-finder")]
(let [earliest-offset (topic-offset c topic partition-id :earliest)
latest-offset (topic-offset c topic partition-id :latest)
offset-diff (int (/ (- latest-offset earliest-offset)
2))
required-jumps (int (Math/ceil (/ (Math/log offset-diff)
(Math/log 2))))]
(if (<= required-jumps max-jumps)
(loop [offset earliest-offset
incr-by offset-diff]
(let [msg (last (messages c
"offset-finder"
topic
partition-id
offset
fetch-size))]
(if (zero? incr-by)
msg
(if msg
(if (compare-pred msg)
(recur (+ offset incr-by)
(int (/ incr-by 2)))
(recur (- offset incr-by)
(int (/ incr-by 2))))
(recur (- offset incr-by)
(int (/ incr-by 2)))))))
(throw (Exception. (format "Finding offset will take more than %d. %d jumps needed to find the offset."
max-jumps
required-jumps)))))))
(defn find-offsets
[topic {:keys [host port]} pred & {:keys [max-jumps
fetch-size]
:or {max-jumps 10
fetch-size 10000}}]
(let [c (consumer host port "offset-finder")]
(map (fn [{:keys [leader partition-id]}]
(let [{:keys [offset value]} (find-msg-at-offset pred
topic
partition-id
leader
:max-jumps max-jumps
:fetch-size fetch-size)]
{:topic topic
:partition-id partition-id
:offset offset}))
(:partition-metadata (first (topic-meta-data c [topic]))))))
(comment
(find-offsets "kafka-topic"
{:host "kafka-broker"
:port 9092}
(let [dt (ct/date-time 2015 4 10)]
(fn [msg]
;; Predicate that returns true if monotonically
;; increasing number in your message is less
;; than the point at which you want the offset
;; Returns false if the the number is past the point.
(let [{:keys [kafka_meta]} (-> msg
:value
String.
(read-str :key-fn keyword))]
(<= (:msg_ts kafka_meta)
(.getMillis dt)))))
:max-jumps 20
:fetch-size 10000))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment