Skip to content

Instantly share code, notes, and snippets.

Created Feb 7, 2021
What would you like to do?
(ns digital-wallet.main-test-old
(:require [clojure.test :refer [deftest]]
[midje.sweet :refer [fact =>]])
(:import (clojure.lang ILookup)
(java.lang AutoCloseable)
(java.util Properties)
(org.apache.kafka.common.serialization Serdes Serdes$StringSerde Serdes$LongSerde)
(org.apache.kafka.streams TopologyTestDriver StreamsConfig Topology KeyValue)
(org.apache.kafka.streams.processor.api ProcessorSupplier Processor ProcessorContext Record)
(org.apache.kafka.streams.state Stores KeyValueStore)
(java.time Duration)
(org.apache.kafka.streams.processor PunctuationType Punctuator)))
(set! *warn-on-reflection* true)
;; This is the model of a transaction entry
;; (use this model for the providers and BinoBank statements)
[{:transaction-id "{{UUID}}"
:description "CASH{{IN|OUT}} VIA {{TYPE}}"
:transaction-type "PIX|CARD"
:entry-date "yyyy-MM-dd'T'HH:mm:ss"
:amount 1000
;; obs: CARD transactions are only DEBIT
:type "CREDIT|DEBIT"}]
;; This is the model of a webhook notification
{:message "new transaction for you and only you"}
;; useless message, just an alert
;; This is the model of the user (shared resource)
{:balance 0})
(defn ^AutoCloseable ->app
(let [*store (promise)
*ctx (promise)
flush-storage-punctuator (reify Punctuator
(punctuate [this time]
(let [^KeyValueStore store @*store
^ProcessorContext ctx @*ctx]
(doseq [^KeyValue kv (iterator-seq (.all store))]
(.forward ctx (Record. (.key kv)
(.value kv)
processor-supplier (reify ProcessorSupplier
(get [this]
(reify Processor
(init [this ctx]
(deliver *ctx ctx)
(.schedule ctx
(Duration/ofSeconds 60)
(.schedule ctx
(Duration/ofSeconds 10)
(process [this record]
(let [key (.key record)
value (.value record)
^KeyValueStore store @*store
old-value (.get store key)]
(when (or (nil? old-value)
(> value old-value))
(.put store key value)))))))
store-builder (-> (Stores/keyValueStoreBuilder
(Stores/inMemoryKeyValueStore "aggStore")
topology (doto (Topology.)
(.addSource "sourceProcessor"
^"[Ljava.lang.String;" (into-array ["input-topic"]))
(.addProcessor "aggregator"
^"[Ljava.lang.String;" (into-array ["sourceProcessor"]))
(.addStateStore store-builder ^"[Ljava.lang.String;" (into-array ["aggregator"]))
(.addSink "sinkProcessor" "result-topic" ^"[Ljava.lang.String;" (into-array ["aggregator"])))
props (doto (Properties.)
(.setProperty StreamsConfig/APPLICATION_ID_CONFIG "maxAggregation")
(.setProperty StreamsConfig/BOOTSTRAP_SERVERS_CONFIG "dummy:1234")
(.setProperty StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (-> (Serdes/String) class .getName))
(.setProperty StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (-> (Serdes/Long) class .getName)))
test-driver (TopologyTestDriver. topology props)
string-serde (Serdes$StringSerde.)
long-serde (Serdes$LongSerde.)
input-topic (.createInputTopic test-driver "input-topic"
(.serializer string-serde) (.serializer long-serde))
output-topic (.createOutputTopic test-driver "result-topic"
(.deserializer string-serde) (.deserializer long-serde))
store (.getKeyValueStore test-driver "aggStore")
kv {::output-topic output-topic
::input-topic input-topic
::store store
::test-driver test-driver}]
(deliver *store store)
(.put store "a" 21)
(valAt [this key]
(kv key))
(close [this]
(.close test-driver)))))
(deftest should-flush-store-for-first-input
(with-open [app (->app)]
(let [{::keys [input-topic
output-topic]} app]
(.pipeInput input-topic "a" 1)
(.readKeyValue output-topic)
=> (KeyValue. "a" 21))
(.isEmpty output-topic)
=> true))))
(deftest should-not-update-store-for-smaller-value
(with-open [app (->app)]
(let [{::keys [input-topic
store]} app]
(.pipeInput input-topic "a" 1)
(.get store "a")
=> 21)
(.readKeyValue output-topic)
=> (KeyValue. "a" 21))
(.isEmpty output-topic)
=> true))))
(deftest should-update-store-for-larger-value
(with-open [app (->app)]
(let [{::keys [input-topic
store]} app]
(.pipeInput input-topic "a" 42)
(.get store "a")
=> 42)
(.readKeyValue output-topic)
=> (KeyValue. "a" 42))
(.isEmpty output-topic)
=> true))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment