Skip to content

Instantly share code, notes, and snippets.

@fr33m0nk
Last active December 21, 2022 14:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fr33m0nk/0958c61377d429e8690a8d7fe0ee9cc4 to your computer and use it in GitHub Desktop.
Save fr33m0nk/0958c61377d429e8690a8d7fe0ee9cc4 to your computer and use it in GitHub Desktop.
Kafka consumer actor model based on pull semantics
(ns clj-actor.promesa-actor
(:require
[clojure.java.data :as j]
[nio-kafka.consumer :as c]
[promesa.core :as p]
[promesa.exec.csp :as sp])
(:import (org.apache.kafka.clients.consumer ConsumerRecords KafkaConsumer OffsetAndMetadata)
(org.apache.kafka.common TopicPartition)))
;;;;;;;;;;;;;;;;;;;;;;;;;; Helper fns
(defn consumer-records->data-chan-fn
[^ConsumerRecords consumer-records channel]
(when-not (.isEmpty consumer-records)
(sp/put! channel consumer-records)))
(defn fetch-next-records
[kafka-consumer data-channel]
(try
;; Simulate restarting of actor in case of failure
#_(throw (new Exception "hello"))
(-> kafka-consumer
c/resume-consumer
(c/poll-consumer-records! 500)
(consumer-records->data-chan-fn data-channel))
(c/pause-consumer kafka-consumer (c/current-assignment kafka-consumer))
(catch Exception ex
;; Exception handling here
ex)))
(defn commit-records
[kafka-consumer consumer-records]
(try
(let [offset-map (into {}
(map (fn [consumer-record]
[(j/to-java TopicPartition consumer-record)
(j/to-java OffsetAndMetadata consumer-record)]))
consumer-records)]
(c/commit-consumer kafka-consumer offset-map 1000))
(catch Exception ex
ex)))
(defn poll-loop-inbox-actions-handler
[kafka-consumer data-channel {:keys [action arg]}]
(condp = action
:commit (commit-records kafka-consumer arg)
:next (fetch-next-records kafka-consumer data-channel)))
(defn stop-poll-and-channels
[state next-actor-state & channels]
(let [{:keys [kafka-consumer]} @state]
(println "Stopping consumer loop")
(doseq [chan channels]
(sp/close! chan))
(c/close-consumer kafka-consumer 1000)
(swap! state assoc :actor-state next-actor-state)))
;;;;;;;;;;;;;;;;;;;;;;;;;;
(defprotocol IActor
(init [this config])
(start [this])
(stop [this])
(restart [this])
(on-message [this message] [this message args]))
;; TODO: Use deftype instead of keeping state in an atom in a defrecord?
(defrecord KafkaConsumerActorRecord
[name state]
IActor
(init [this {:keys [bootstrap-servers topics] :as init-config}]
(when (empty? @state)
(let []
(swap! state assoc
:actor-state :not-started
:init-config init-config)))
this)
(start [this]
(let [{:keys [actor-state]
{:keys [bootstrap-servers topics start-timeout]} :init-config} @state]
(if-not (#{:errored :not-started} actor-state)
this
(let [^KafkaConsumer kafka-consumer (-> bootstrap-servers c/config c/consumer (c/subscribe-consumer topics))
inbox-chan (sp/chan (sp/sliding-buffer 100))
signal-chan (sp/chan (sp/sliding-buffer 100))
data-chan (sp/chan 100)
;; TODO: first poll is necessary to get assignments for pausing consumer
;; TODO: Handle below failure?
_ (-> kafka-consumer
(c/poll-consumer-records! 500)
(consumer-records->data-chan-fn data-chan))
_ (c/pause-consumer kafka-consumer (c/current-assignment kafka-consumer))
poll-loop (sp/go-loop
[]
(let [[v ch] (sp/alts! [signal-chan inbox-chan (sp/timeout-chan 1000)] :priority true)]
(cond
(and (= ch signal-chan) (= v :stop)) (stop-poll-and-channels state :stopped inbox-chan signal-chan data-chan)
(= ch inbox-chan) (let [result (poll-loop-inbox-actions-handler kafka-consumer data-chan v)]
(if-not (instance? Exception result)
(recur)
;; Restart actor in case of failure with consumer-operations
(do
(stop-poll-and-channels state :errored inbox-chan signal-chan data-chan)
(println "restarting actor system")
(restart this))))
:else (do
(c/poll-consumer-records! kafka-consumer 500)
(recur)))))
next-actor-state (if-let [state (p/await! poll-loop start-timeout)]
state
:running)]
(swap! state assoc
:kafka-consumer kafka-consumer
:inbox-chan inbox-chan
:signal-chan signal-chan
:data-chan data-chan
:poll-loop poll-loop
:actor-state next-actor-state)
this))))
(stop [this]
(let [{:keys [actor-state signal-chan poll-loop]} @state]
(when (= :running actor-state)
(sp/put! signal-chan :stop)
(when @poll-loop
this))))
(restart [this]
(let [{:keys [actor-state]} @state]
(if (not= :errored actor-state)
(-> this stop start)
(start this))))
(on-message [this message]
(on-message this message nil))
(on-message [this message args]
(sp/go
(let [{:keys [data-chan inbox-chan actor-state]} @state]
(condp = message
:state actor-state
:next (when (= actor-state :running)
(let [[v ch] (sp/alts! [data-chan (sp/timeout-chan 50)] :priority true)]
(if (= ch data-chan)
v
(do
(sp/put! inbox-chan {:action :next})
(sp/take! data-chan 500)))))
:commit (when (= actor-state :running)
(when (seq args)
(sp/put! inbox-chan {:action :commit
:arg args}))))))))
(defn ->kafka-consumer-actor-record
[name config]
(-> (map->KafkaConsumerActorRecord {:name name
:state (atom {})})
(init config)))
(comment
(def consumer-actor (->kafka-consumer-actor-record
"test-actor"
{:bootstrap-servers "localhost:9092"
:topics ["hello_world"]
:start-timeout 200}))
(start consumer-actor)
(def b2 (seq @(on-message consumer-actor :next)))
(def c @(on-message consumer-actor :commit b2))
(stop consumer-actor)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment