-
-
Save jeans11/295b112c93eebb175e2245a276738d9a to your computer and use it in GitHub Desktop.
Missionary + Rama
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns core | |
(:require [missionary.core :as m]) | |
(:import | |
(com.rpl.rama Depot RamaModule CompoundAgg PState Agg Path ProxyState ProxyState$Callback) | |
(com.rpl.rama.module StreamTopology) | |
(com.rpl.rama.ops Ops) | |
(com.rpl.rama.test InProcessCluster LaunchConfig))) | |
(deftype SimpleWordCountModule [] | |
RamaModule | |
(define [_ setup topologies] | |
(.declareDepot setup "*depot" (Depot/random)) | |
(let [s (.stream topologies "s")] | |
(.pstate s "$$word-counts" (PState/mapSchema String Long)) | |
(-> (.source s "*depot") | |
(.out (into-array String ["*word"])) | |
(.hashPartition "*word") | |
(.each Ops/PRINTLN "depot receive the word: " "*word") | |
(.compoundAgg "$$word-counts" (CompoundAgg/map (into-array Object ["*word" (Agg/count)]))))))) | |
(deftype ProxyCallback [f] | |
ProxyState$Callback | |
(change [_ new diff old] | |
(when new | |
(f new)))) | |
(defn run [^RamaModule module ^InProcessCluster cluster] | |
(.launchModule cluster module (LaunchConfig. 1 1))) | |
(defn make-reactive-pstate [pstate pkey] | |
(->> (m/observe | |
(fn [!] | |
(! nil) | |
(let [p (.proxy pstate | |
(Path/key (into-array String [pkey])) | |
(->ProxyCallback !))] | |
#(.close p)))) | |
(m/reduce (fn [_ value] | |
(println (format "The value of the %s key is now: %s" pkey value))) | |
nil))) | |
(comment | |
;; Start the Rama cluster | |
(def cluster (InProcessCluster/create)) | |
;; Launch a module into the cluster | |
(run (->SimpleWordCountModule) cluster) | |
(def simple-count-module-name (.getName SimpleWordCountModule)) | |
;; Access the module depot | |
(def depot (.clusterDepot cluster simple-count-module-name "*depot")) | |
;; Access the module pstate | |
(def word-counts (.clusterPState cluster simple-count-module-name "$$word-counts")) | |
;; Init the reactive pstate | |
(def run-reactive-bike-pstate (make-reactive-pstate word-counts "bike")) | |
(def run-reactive-bus-pstate (make-reactive-pstate word-counts "bus")) | |
(def run-reactive-car-pstate (make-reactive-pstate word-counts "car")) | |
;; Run the pstate stream | |
(run-reactive-bike-pstate #(do nil) #(do nil)) | |
(run-reactive-bus-pstate #(do nil) #(do nil)) | |
(run-reactive-car-pstate #(do nil) #(do nil)) | |
;; Add some data to the depot | |
(.append depot "bike") | |
(.append depot "car") | |
(.append depot "bus") | |
(.append depot "bus") | |
(.append depot "bus") | |
(.append depot "bike") | |
(.close cluster) | |
nil) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
rama_missionary-converted.mov