Last active
August 29, 2015 14:07
-
-
Save roman/66649bd8861375296425 to your computer and use it in GitHub Desktop.
Naive approach for Bg process handling using Rx + Clojure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns clj-playground.background | |
(:require [rx.lang.clojure.core :as rx] | |
[rx.lang.clojure.interop :as rxi] | |
[clj-http.client :as http] | |
[monads.core :refer [Monad]] | |
[monads.macros :as monadic]) | |
(:import | |
[java.util.concurrent Executors] | |
[rx Observable] | |
[rx.schedulers Schedulers])) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Util implementation of Haskell MVar for syncing threads | |
(defprotocol IMCons | |
(mcar [p]) | |
(mcdr [p]) | |
(set-mcar! [p val]) | |
(set-mcdr! [p val])) | |
(deftype MCons [^{:volatile-mutable true} car ^{:volatile-mutable true} cdr] | |
IMCons | |
(mcar [this] car) | |
(mcdr [this] cdr) | |
(set-mcar! [this val] | |
(set! car val)) | |
(set-mcdr! [this val] | |
(set! cdr val))) | |
(defn mcons [a b] | |
(MCons. a b)) | |
(defn mcar [p] (.mcar p)) | |
(defn mcdr [p] (.mcdr p)) | |
(defn set-mcar! [p v] (.set-mcar! p v)) | |
(defn set-mcdr! [p v] (.set-mcdr! p v)) | |
(defn empty-mvar [] | |
(let [take-sem (java.util.concurrent.Semaphore. 1) | |
put-sem (java.util.concurrent.Semaphore. 1)] | |
(.acquire take-sem) | |
(mcons nil (mcons take-sem put-sem)))) | |
(defn put-mvar [mvar val] | |
(let [take-sem (mcar (mcdr mvar)) | |
put-sem (mcdr (mcdr mvar))] | |
(.acquire put-sem) | |
(set-mcar! mvar val) | |
(.release take-sem))) | |
(defn take-mvar [mvar] | |
(let [take-sem (mcar (mcdr mvar)) | |
put-sem (mcdr (mcdr mvar))] | |
(.acquire take-sem) | |
(let [val (mcar mvar)] | |
(set-mcar! mvar nil) | |
(.release put-sem) | |
val))) | |
(defn new-mvar [val] | |
(let [mvar (empty-mvar)] | |
(put-mvar mvar val) | |
mvar)) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Monad implementation of Observable for fancy composition | |
(extend-protocol Monad | |
Observable | |
(do-result [_ x] | |
(Observable/just x)) | |
(bind [mv f] | |
(.flatMap mv (rxi/fn* f)))) | |
(defn observable-m [v] | |
(Observable/just v)) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; App state | |
(defrecord AppState [http-pool]) | |
(defn new-app-state [] | |
(atom | |
;; create a Thread Pool for HTTP requests | |
(map->AppState | |
{:http-scheduler (Schedulers/from (Executors/newFixedThreadPool 10))}))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;; Async observable that gets results from Google | |
(defn google-query | |
([scheduler q] | |
(.subscribeOn (google-query q) scheduler)) | |
([q] | |
(rx/observable* | |
(fn google-query-observable [^rx.Subscriber observer] | |
(let [url (str "http://google.com?q=" q)] | |
(rx/on-next observer (-> url http/get :body)) | |
(rx/on-completed observer)))))) | |
;; Ugly way to do it using flatMap (a.k.a bind) | |
(defn both-results [scheduler] | |
(.flatMap | |
(google-query scheduler "haskell") | |
(rxi/fn [haskell-result] | |
(.flatMap | |
(google-query scheduler "clojure") | |
(rxi/fn [clojure-result] | |
(Observable/just [haskell-result clojure-result])))))) | |
;; Using monadic API | |
(defn both-results-m [scheduler] | |
(monadic/do observable-m | |
[haskell (google-query scheduler "haskell") | |
clojure (google-query scheduler "clojure")] | |
[haskell "\n=======\n" clojure])) | |
;; Main function | |
(defn run-example [] | |
(let [app-state (new-app-state) | |
scheduler (:http-scheduler @app-state) | |
bg-observable (both-results-m scheduler) | |
result (empty-mvar)] | |
(rx/subscribe | |
bg-observable | |
(fn my-on-next [body] | |
;; this is executed on a different thread | |
;; so we are going to sync with main thread | |
;; via MVar | |
(put-mvar result body)) | |
(fn my-on-error [err] | |
(put-mvar result err)) | |
(fn my-on-complete [] (println "DONE!"))) | |
;; this will unblock when the MVar is filled | |
(take-mvar result))) | |
;; Pending: Observable failure and how to handle |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment