Skip to content

Instantly share code, notes, and snippets.

@cddr
Last active February 15, 2017 10:48
Show Gist options
  • Save cddr/cc2c12c61236c86a816046573e210968 to your computer and use it in GitHub Desktop.
Save cddr/cc2c12c61236c86a816046573e210968 to your computer and use it in GitHub Desktop.
(defn holdings-manager
"Returns a collection of kstreams, one for each different way in which
loan ownership state may be updated"
[puma-events]
(let [ownership-store (k/store "loan-ownership")]
(-> (k/kstream puma-events)
(k/transform (update-ownership ownership-store))
(k/branch [trade-result?
repayment-result?]))])
(defmulti build-changelog-entry
"Produces a changelog entry from a loan ownership update 'result'. The
result should contain enough information that would allow a process
to construct the resulting entry on the changelog."
[puma-result-event]
{:id "new id"})
(defmethod product-unit-changes :trade-result [trade-result]
(concat
(buyer-side-changes trade-result)
(seller-side-changes trade-result)))
(defmethod product-unit-changes :repayment-result [repayment-result]
(concat
(buyer-side-changes repayment-result)
(seller-side-changes repayment-result)))
(defn holdings-manager-job
"Runs the holdings manager and materializes the output topics"
[]
(let [builder (topology-builder)
trade-requests (k/kstream (in/trade-requests))
repayment-requests (k/stream (in/repayment-requests))]
(let [[trade-results repayment-results]
(-> (holdings-manager
(k/merge [trade-requests repayment-requests])))]
(k/to! builder trade-results (out/trade-results))
(k/to! builder repayment-results (out/repayment-results)))))
(defn holdings-manager-logger
"Monitors result topics for changes to loan ownership and emits the
changelog topic."
[]
(let [builder (topology-builder)
trade-results (k/stream (in/trade-results))
repayment-results (k/kstream (in/repayment-results))]
(let [all-results (k/merge builder [trade-results repayment-results])]
(-> all-results
(k/flat-map build-changelog-entry)
(k/through (out/product-unit-changelog))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment