Skip to content

Instantly share code, notes, and snippets.

@jeans11

jeans11/core.clj Secret

Last active June 25, 2024 22:27
Show Gist options
  • Save jeans11/295b112c93eebb175e2245a276738d9a to your computer and use it in GitHub Desktop.
Save jeans11/295b112c93eebb175e2245a276738d9a to your computer and use it in GitHub Desktop.
Missionary + Rama
(ns core
(:require [missionary.core :as m])
(:import
(com.rpl.rama Depot RamaModule CompoundAgg PState Agg Path ProxyState ProxyState$Callback)
(com.rpl.rama.module StreamTopology)
(com.rpl.rama.ops Ops)
(com.rpl.rama.test InProcessCluster LaunchConfig)))
(deftype SimpleWordCountModule []
RamaModule
(define [_ setup topologies]
(.declareDepot setup "*depot" (Depot/random))
(let [s (.stream topologies "s")]
(.pstate s "$$word-counts" (PState/mapSchema String Long))
(-> (.source s "*depot")
(.out (into-array String ["*word"]))
(.hashPartition "*word")
(.each Ops/PRINTLN "depot receive the word: " "*word")
(.compoundAgg "$$word-counts" (CompoundAgg/map (into-array Object ["*word" (Agg/count)])))))))
(deftype ProxyCallback [f]
ProxyState$Callback
(change [_ new diff old]
(when new
(f new))))
(defn run [^RamaModule module ^InProcessCluster cluster]
(.launchModule cluster module (LaunchConfig. 1 1)))
(defn make-reactive-pstate [pstate pkey]
(->> (m/observe
(fn [!]
(! nil)
(let [p (.proxy pstate
(Path/key (into-array String [pkey]))
(->ProxyCallback !))]
#(.close p))))
(m/reduce (fn [_ value]
(println (format "The value of the %s key is now: %s" pkey value)))
nil)))
(comment
;; Start the Rama cluster
(def cluster (InProcessCluster/create))
;; Launch a module into the cluster
(run (->SimpleWordCountModule) cluster)
(def simple-count-module-name (.getName SimpleWordCountModule))
;; Access the module depot
(def depot (.clusterDepot cluster simple-count-module-name "*depot"))
;; Access the module pstate
(def word-counts (.clusterPState cluster simple-count-module-name "$$word-counts"))
;; Init the reactive pstate
(def run-reactive-bike-pstate (make-reactive-pstate word-counts "bike"))
(def run-reactive-bus-pstate (make-reactive-pstate word-counts "bus"))
(def run-reactive-car-pstate (make-reactive-pstate word-counts "car"))
;; Run the pstate stream
(run-reactive-bike-pstate #(do nil) #(do nil))
(run-reactive-bus-pstate #(do nil) #(do nil))
(run-reactive-car-pstate #(do nil) #(do nil))
;; Add some data to the depot
(.append depot "bike")
(.append depot "car")
(.append depot "bus")
(.append depot "bus")
(.append depot "bus")
(.append depot "bike")
(.close cluster)
nil)
@dustingetz
Copy link

rama_missionary-converted.mov

@divs1210
Copy link

Thanks for the demo, @jeans11 and @dustingetz!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment