Skip to content

Instantly share code, notes, and snippets.

@kirankulkarni
Forked from kapilreddy/find-offsets.clj
Last active April 17, 2017 12:28
Show Gist options
  • Save kirankulkarni/b79c1930cd86c7ddc06f3884d79579fc to your computer and use it in GitHub Desktop.
Save kirankulkarni/b79c1930cd86c7ddc06f3884d79579fc to your computer and use it in GitHub Desktop.
Kafka message offset finder
;; project.clj
;; [clj-time "0.6.0"]
;; [org.clojure/data.json "0.2.4"]
;; [clj-kafka "0.2.8-0.8.1.1"]
;; Utility to find offsets in a given Kafka topic for a given
;; cursor/point in time. The code assumes that each message has a
;; monotonically increasing number (ex. unix timestamp) associated with
;; it.
(ns find-offset
(:require [clj-kafka.consumer.simple :refer [consumer
messages
topic-meta-data]]
[clj-time.core :as ct]
[clojure.data.json :refer [read-str]])
(:import [org.joda.time DateTime]
[kafka.javaapi.consumer SimpleConsumer]
[kafka.common TopicAndPartition]
[kafka.javaapi OffsetRequest OffsetResponse ]
[kafka.api FetchRequest FetchRequestBuilder PartitionOffsetRequestInfo]))
(defn get-topic-offset
[^SimpleConsumer c topic partition-id time-ms]
{:pre [(and (integer? time-ms)
(or (pos? time-ms)
(= -1 time-ms)
(= -2 time-ms)))]}
(let [tp (TopicAndPartition. ^String topic partition-id)
pori (PartitionOffsetRequestInfo. time-ms 1)
hm (java.util.HashMap. {tp pori})]
(let [response (.getOffsetsBefore c
(OffsetRequest. hm
(kafka.api.OffsetRequest/CurrentVersion)
"clj-kafka-id"))]
(first (.offsets ^OffsetResponse response topic partition-id)))))
(defn find-msg-at-offset
[compare-pred topic partition-id {:keys [host port]} & {:keys [max-jumps fetch-size
offset-search-time]
:or {max-jumps 10
fetch-size 10000}}]
(let [c (consumer host port "offset-finder")]
(let [earliest-offset
(if offset-search-time
(if-let [nearest-offset
(get-topic-offset c topic partition-id offset-search-time)]
nearest-offset
(get-topic-offset c topic partition-id -2))
(get-topic-offset c topic partition-id -2))
latest-offset (get-topic-offset c topic partition-id -1)
offset-diff (int (/ (- latest-offset earliest-offset)
2))
required-jumps (int (Math/ceil (/ (Math/log offset-diff)
(Math/log 2))))]
(if (<= required-jumps max-jumps)
(loop [offset earliest-offset
incr-by offset-diff]
(let [msg (last (messages c
"offset-finder"
topic
partition-id
offset
fetch-size))]
(if (zero? incr-by)
msg
(if msg
(if (compare-pred msg)
(recur (+ offset incr-by)
(int (/ incr-by 2)))
(recur (- offset incr-by)
(int (/ incr-by 2))))
(recur (- offset incr-by)
(int (/ incr-by 2)))))))
(throw (Exception. (format "Finding offset will take more than %d. %d jumps needed to find the offset."
max-jumps
required-jumps)))))))
(defn find-offsets
[topic {:keys [host port]} pred & {:keys [max-jumps
fetch-size
offset-search-time]
:or {max-jumps 10
fetch-size 10000}}]
(let [c (consumer host port "offset-finder")]
(map (fn [{:keys [leader partition-id]}]
(let [{:keys [offset value]} (find-msg-at-offset pred
topic
partition-id
leader
:max-jumps max-jumps
:fetch-size fetch-size
:offset-search-time offset-search-time)]
{:topic topic
:partition-id partition-id
:offset offset}))
(:partition-metadata (first (topic-meta-data c [topic]))))))
(comment
(let [dt (ct/date-time 2015 4 10)]
(find-offsets "kafka-topic"
{:host "kafka-broker"
:port 9092}
(fn [msg]
;; Predicate that returns true if monotonically
;; increasing number in your message is less
;; than the point at which you want the offset
;; Returns false if the the number is past the point.
(let [{:keys [kafka_meta]} (-> msg
:value
String.
(read-str :key-fn keyword))]
(<= (:msg_ts kafka_meta)
(.getMillis dt))))
:max-jumps 20
:fetch-size 10000
:offset-search-time (.getMillis dt))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment