(ns digital-wallet.main-test-old | |
(:require [clojure.test :refer [deftest]] | |
[midje.sweet :refer [fact =>]]) | |
(:import (clojure.lang ILookup) | |
(java.lang AutoCloseable) | |
(java.util Properties) | |
(org.apache.kafka.common.serialization Serdes Serdes$StringSerde Serdes$LongSerde) | |
(org.apache.kafka.streams TopologyTestDriver StreamsConfig Topology KeyValue) | |
(org.apache.kafka.streams.processor.api ProcessorSupplier Processor ProcessorContext Record) | |
(org.apache.kafka.streams.state Stores KeyValueStore) | |
(java.time Duration) | |
(org.apache.kafka.streams.processor PunctuationType Punctuator))) | |
(set! *warn-on-reflection* true) | |
(comment | |
;; This is the model of a transaction entry | |
;; (use this model for the providers and BinoBank statements) | |
[{:transaction-id "{{UUID}}" | |
:description "CASH{{IN|OUT}} VIA {{TYPE}}" | |
:transaction-type "PIX|CARD" | |
:entry-date "yyyy-MM-dd'T'HH:mm:ss" | |
:amount 1000 | |
;; obs: CARD transactions are only DEBIT | |
:type "CREDIT|DEBIT"}] | |
;; This is the model of a webhook notification | |
{:message "new transaction for you and only you"} | |
;; useless message, just an alert | |
;; This is the model of the user (shared resource) | |
{:balance 0}) | |
(defn ^AutoCloseable ->app | |
[] | |
(let [*store (promise) | |
*ctx (promise) | |
flush-storage-punctuator (reify Punctuator | |
(punctuate [this time] | |
(let [^KeyValueStore store @*store | |
^ProcessorContext ctx @*ctx] | |
(doseq [^KeyValue kv (iterator-seq (.all store))] | |
(.forward ctx (Record. (.key kv) | |
(.value kv) | |
time)))))) | |
processor-supplier (reify ProcessorSupplier | |
(get [this] | |
(reify Processor | |
(init [this ctx] | |
(deliver *ctx ctx) | |
(.schedule ctx | |
(Duration/ofSeconds 60) | |
PunctuationType/WALL_CLOCK_TIME | |
flush-storage-punctuator) | |
(.schedule ctx | |
(Duration/ofSeconds 10) | |
PunctuationType/STREAM_TIME | |
flush-storage-punctuator)) | |
(process [this record] | |
(let [key (.key record) | |
value (.value record) | |
^KeyValueStore store @*store | |
old-value (.get store key)] | |
(when (or (nil? old-value) | |
(> value old-value)) | |
(.put store key value))))))) | |
store-builder (-> (Stores/keyValueStoreBuilder | |
(Stores/inMemoryKeyValueStore "aggStore") | |
(Serdes/String) | |
(Serdes/Long)) | |
(.withLoggingDisabled)) | |
topology (doto (Topology.) | |
(.addSource "sourceProcessor" | |
^"[Ljava.lang.String;" (into-array ["input-topic"])) | |
(.addProcessor "aggregator" | |
processor-supplier | |
^"[Ljava.lang.String;" (into-array ["sourceProcessor"])) | |
(.addStateStore store-builder ^"[Ljava.lang.String;" (into-array ["aggregator"])) | |
(.addSink "sinkProcessor" "result-topic" ^"[Ljava.lang.String;" (into-array ["aggregator"]))) | |
props (doto (Properties.) | |
(.setProperty StreamsConfig/APPLICATION_ID_CONFIG "maxAggregation") | |
(.setProperty StreamsConfig/BOOTSTRAP_SERVERS_CONFIG "dummy:1234") | |
(.setProperty StreamsConfig/DEFAULT_KEY_SERDE_CLASS_CONFIG (-> (Serdes/String) class .getName)) | |
(.setProperty StreamsConfig/DEFAULT_VALUE_SERDE_CLASS_CONFIG (-> (Serdes/Long) class .getName))) | |
test-driver (TopologyTestDriver. topology props) | |
string-serde (Serdes$StringSerde.) | |
long-serde (Serdes$LongSerde.) | |
input-topic (.createInputTopic test-driver "input-topic" | |
(.serializer string-serde) (.serializer long-serde)) | |
output-topic (.createOutputTopic test-driver "result-topic" | |
(.deserializer string-serde) (.deserializer long-serde)) | |
store (.getKeyValueStore test-driver "aggStore") | |
kv {::output-topic output-topic | |
::input-topic input-topic | |
::store store | |
::test-driver test-driver}] | |
(deliver *store store) | |
(.put store "a" 21) | |
(reify | |
ILookup | |
(valAt [this key] | |
(kv key)) | |
AutoCloseable | |
(close [this] | |
(.close test-driver))))) | |
(deftest should-flush-store-for-first-input | |
(with-open [app (->app)] | |
(let [{::keys [input-topic | |
output-topic]} app] | |
(.pipeInput input-topic "a" 1) | |
(fact | |
(.readKeyValue output-topic) | |
=> (KeyValue. "a" 21)) | |
(fact | |
(.isEmpty output-topic) | |
=> true)))) | |
(deftest should-not-update-store-for-smaller-value | |
(with-open [app (->app)] | |
(let [{::keys [input-topic | |
output-topic | |
store]} app] | |
(.pipeInput input-topic "a" 1) | |
(fact | |
(.get store "a") | |
=> 21) | |
(fact | |
(.readKeyValue output-topic) | |
=> (KeyValue. "a" 21)) | |
(fact | |
(.isEmpty output-topic) | |
=> true)))) | |
(deftest should-update-store-for-larger-value | |
(with-open [app (->app)] | |
(let [{::keys [input-topic | |
output-topic | |
store]} app] | |
(.pipeInput input-topic "a" 42) | |
(fact | |
(.get store "a") | |
=> 42) | |
(fact | |
(.readKeyValue output-topic) | |
=> (KeyValue. "a" 42)) | |
(fact | |
(.isEmpty output-topic) | |
=> true)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment