Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@cddr
Last active October 8, 2019 23:52
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cddr/d2564689a9cd10367bd0d2a86e860016 to your computer and use it in GitHub Desktop.
Save cddr/d2564689a9cd10367bd0d2a86e860016 to your computer and use it in GitHub Desktop.
(ns mock-avro-example
(:require
[clojure.data.json :as json]
[jackdaw.streams :as k]
[jackdaw.serdes.avro :as avro]
[jackddaw.serdes.avro.schema-registry :as reg]
[jackdaw.test :as jdt]))
(def foo-schema
{:type :record
:name "foo"
:namespace "big.data.co"
:fields [{:name "id"
:type :string}
{:name "amount"
:type :long}
{:name "updated_at"
:type {:type :long
:logicalType "tmestamp-millis"}}]})
(defn avro-serde
[schema key? client-options]
(let [schema-options {:avro/schema (json/write-str schema)
:key? key?}]
(avro/serde avro/+base-schema-type-registry+
client-opts
schema-opts)))
(defn topic-config
[{:keys [registry-url registry-client]}]
(let [client-options {:avro.schema-registry/client registry-client
:avro.schema-registry/url registry-url}]
{:foo {:topic-name "foo"
:replication-factor 1
:partition-count 1
:key-serde (avro-serde {:type "string"} true client-options)
:value-serde (avro-serde foo-schema false client-options)}
:bar {:topic-name "foo"
:replication-factor 1
:partition-count 1
:key-serde (avro-serde {:type "string"} true client-options)
:value-serde (avro-serde bar-schema false client-options)}}))
(defn topology-builder
[topics builder]
(-> (k/stream builder (:foo topics))
(k/map transform-foos)
(k/to (:bar topics)))
builder)
(deftest test-topology-builder
(let [topics (topic-config {:registry-url "http://schema-registry.test"
:registry-client (reg/mock-client)})]
(jdt/with-test-machine (jdt/mock-transport {:driver (jdt/mock-test-driver #(topology-builder topics %))})
(fn [machine]
(let [{:keys [journal results]} (jdt/run-test machine test-commands)]
;; then in here perform the assertions
)))))
(defn -main [& args]
;; This represents where you'd start the topology in a live env
(let [registry-url "https://schema-registry.ccloud:8081"
builder (k/streams-builder)
topics (topic-registry {:registry-url registry-url
:registry-client (reg/client registry-url 100)})]
(-> (topology-builder topics builder)
(k/kafka-streams {"bootstrap.servers" "confluent.cloud:9092"
"group.id" "my-app-group"
"application.id" "my-app-id"})
(k/start))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment