-
-
Save fr33m0nk/0958c61377d429e8690a8d7fe0ee9cc4 to your computer and use it in GitHub Desktop.
Kafka consumer actor model based on pull semantics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns clj-actor.promesa-actor | |
(:require | |
[clojure.java.data :as j] | |
[nio-kafka.consumer :as c] | |
[promesa.core :as p] | |
[promesa.exec.csp :as sp]) | |
(:import (org.apache.kafka.clients.consumer ConsumerRecords KafkaConsumer OffsetAndMetadata) | |
(org.apache.kafka.common TopicPartition))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;; Helper fns | |
(defn consumer-records->data-chan-fn | |
[^ConsumerRecords consumer-records channel] | |
(when-not (.isEmpty consumer-records) | |
(sp/put! channel consumer-records))) | |
(defn fetch-next-records | |
[kafka-consumer data-channel] | |
(try | |
;; Simulate restarting of actor in case of failure | |
#_(throw (new Exception "hello")) | |
(-> kafka-consumer | |
c/resume-consumer | |
(c/poll-consumer-records! 500) | |
(consumer-records->data-chan-fn data-channel)) | |
(c/pause-consumer kafka-consumer (c/current-assignment kafka-consumer)) | |
(catch Exception ex | |
;; Exception handling here | |
ex))) | |
(defn commit-records | |
[kafka-consumer consumer-records] | |
(try | |
(let [offset-map (into {} | |
(map (fn [consumer-record] | |
[(j/to-java TopicPartition consumer-record) | |
(j/to-java OffsetAndMetadata consumer-record)])) | |
consumer-records)] | |
(c/commit-consumer kafka-consumer offset-map 1000)) | |
(catch Exception ex | |
ex))) | |
(defn poll-loop-inbox-actions-handler | |
[kafka-consumer data-channel {:keys [action arg]}] | |
(condp = action | |
:commit (commit-records kafka-consumer arg) | |
:next (fetch-next-records kafka-consumer data-channel))) | |
(defn stop-poll-and-channels | |
[state next-actor-state & channels] | |
(let [{:keys [kafka-consumer]} @state] | |
(println "Stopping consumer loop") | |
(doseq [chan channels] | |
(sp/close! chan)) | |
(c/close-consumer kafka-consumer 1000) | |
(swap! state assoc :actor-state next-actor-state))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;; | |
(defprotocol IActor | |
(init [this config]) | |
(start [this]) | |
(stop [this]) | |
(restart [this]) | |
(on-message [this message] [this message args])) | |
;; TODO: Use deftype instead of keeping state in an atom in a defrecord? | |
(defrecord KafkaConsumerActorRecord | |
[name state] | |
IActor | |
(init [this {:keys [bootstrap-servers topics] :as init-config}] | |
(when (empty? @state) | |
(let [] | |
(swap! state assoc | |
:actor-state :not-started | |
:init-config init-config))) | |
this) | |
(start [this] | |
(let [{:keys [actor-state] | |
{:keys [bootstrap-servers topics start-timeout]} :init-config} @state] | |
(if-not (#{:errored :not-started} actor-state) | |
this | |
(let [^KafkaConsumer kafka-consumer (-> bootstrap-servers c/config c/consumer (c/subscribe-consumer topics)) | |
inbox-chan (sp/chan (sp/sliding-buffer 100)) | |
signal-chan (sp/chan (sp/sliding-buffer 100)) | |
data-chan (sp/chan 100) | |
;; TODO: first poll is necessary to get assignments for pausing consumer | |
;; TODO: Handle below failure? | |
_ (-> kafka-consumer | |
(c/poll-consumer-records! 500) | |
(consumer-records->data-chan-fn data-chan)) | |
_ (c/pause-consumer kafka-consumer (c/current-assignment kafka-consumer)) | |
poll-loop (sp/go-loop | |
[] | |
(let [[v ch] (sp/alts! [signal-chan inbox-chan (sp/timeout-chan 1000)] :priority true)] | |
(cond | |
(and (= ch signal-chan) (= v :stop)) (stop-poll-and-channels state :stopped inbox-chan signal-chan data-chan) | |
(= ch inbox-chan) (let [result (poll-loop-inbox-actions-handler kafka-consumer data-chan v)] | |
(if-not (instance? Exception result) | |
(recur) | |
;; Restart actor in case of failure with consumer-operations | |
(do | |
(stop-poll-and-channels state :errored inbox-chan signal-chan data-chan) | |
(println "restarting actor system") | |
(restart this)))) | |
:else (do | |
(c/poll-consumer-records! kafka-consumer 500) | |
(recur))))) | |
next-actor-state (if-let [state (p/await! poll-loop start-timeout)] | |
state | |
:running)] | |
(swap! state assoc | |
:kafka-consumer kafka-consumer | |
:inbox-chan inbox-chan | |
:signal-chan signal-chan | |
:data-chan data-chan | |
:poll-loop poll-loop | |
:actor-state next-actor-state) | |
this)))) | |
(stop [this] | |
(let [{:keys [actor-state signal-chan poll-loop]} @state] | |
(when (= :running actor-state) | |
(sp/put! signal-chan :stop) | |
(when @poll-loop | |
this)))) | |
(restart [this] | |
(let [{:keys [actor-state]} @state] | |
(if (not= :errored actor-state) | |
(-> this stop start) | |
(start this)))) | |
(on-message [this message] | |
(on-message this message nil)) | |
(on-message [this message args] | |
(sp/go | |
(let [{:keys [data-chan inbox-chan actor-state]} @state] | |
(condp = message | |
:state actor-state | |
:next (when (= actor-state :running) | |
(let [[v ch] (sp/alts! [data-chan (sp/timeout-chan 50)] :priority true)] | |
(if (= ch data-chan) | |
v | |
(do | |
(sp/put! inbox-chan {:action :next}) | |
(sp/take! data-chan 500))))) | |
:commit (when (= actor-state :running) | |
(when (seq args) | |
(sp/put! inbox-chan {:action :commit | |
:arg args})))))))) | |
(defn ->kafka-consumer-actor-record | |
[name config] | |
(-> (map->KafkaConsumerActorRecord {:name name | |
:state (atom {})}) | |
(init config))) | |
(comment | |
(def consumer-actor (->kafka-consumer-actor-record | |
"test-actor" | |
{:bootstrap-servers "localhost:9092" | |
:topics ["hello_world"] | |
:start-timeout 200})) | |
(start consumer-actor) | |
(def b2 (seq @(on-message consumer-actor :next))) | |
(def c @(on-message consumer-actor :commit b2)) | |
(stop consumer-actor) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment