Skip to content

Instantly share code, notes, and snippets.

@roman
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save roman/66649bd8861375296425 to your computer and use it in GitHub Desktop.
Save roman/66649bd8861375296425 to your computer and use it in GitHub Desktop.
Naive approach for Bg process handling using Rx + Clojure
(ns clj-playground.background
(:require [rx.lang.clojure.core :as rx]
[rx.lang.clojure.interop :as rxi]
[clj-http.client :as http]
[monads.core :refer [Monad]]
[monads.macros :as monadic])
(:import
[java.util.concurrent Executors]
[rx Observable]
[rx.schedulers Schedulers]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Util implementation of Haskell MVar for syncing threads
(defprotocol IMCons
(mcar [p])
(mcdr [p])
(set-mcar! [p val])
(set-mcdr! [p val]))
(deftype MCons [^{:volatile-mutable true} car ^{:volatile-mutable true} cdr]
IMCons
(mcar [this] car)
(mcdr [this] cdr)
(set-mcar! [this val]
(set! car val))
(set-mcdr! [this val]
(set! cdr val)))
(defn mcons [a b]
(MCons. a b))
(defn mcar [p] (.mcar p))
(defn mcdr [p] (.mcdr p))
(defn set-mcar! [p v] (.set-mcar! p v))
(defn set-mcdr! [p v] (.set-mcdr! p v))
(defn empty-mvar []
(let [take-sem (java.util.concurrent.Semaphore. 1)
put-sem (java.util.concurrent.Semaphore. 1)]
(.acquire take-sem)
(mcons nil (mcons take-sem put-sem))))
(defn put-mvar [mvar val]
(let [take-sem (mcar (mcdr mvar))
put-sem (mcdr (mcdr mvar))]
(.acquire put-sem)
(set-mcar! mvar val)
(.release take-sem)))
(defn take-mvar [mvar]
(let [take-sem (mcar (mcdr mvar))
put-sem (mcdr (mcdr mvar))]
(.acquire take-sem)
(let [val (mcar mvar)]
(set-mcar! mvar nil)
(.release put-sem)
val)))
(defn new-mvar [val]
(let [mvar (empty-mvar)]
(put-mvar mvar val)
mvar))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Monad implementation of Observable for fancy composition
(extend-protocol Monad
Observable
(do-result [_ x]
(Observable/just x))
(bind [mv f]
(.flatMap mv (rxi/fn* f))))
(defn observable-m [v]
(Observable/just v))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; App state
(defrecord AppState [http-pool])
(defn new-app-state []
(atom
;; create a Thread Pool for HTTP requests
(map->AppState
{:http-scheduler (Schedulers/from (Executors/newFixedThreadPool 10))})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Async observable that gets results from Google
(defn google-query
([scheduler q]
(.subscribeOn (google-query q) scheduler))
([q]
(rx/observable*
(fn google-query-observable [^rx.Subscriber observer]
(let [url (str "http://google.com?q=" q)]
(rx/on-next observer (-> url http/get :body))
(rx/on-completed observer))))))
;; Ugly way to do it using flatMap (a.k.a bind)
(defn both-results [scheduler]
(.flatMap
(google-query scheduler "haskell")
(rxi/fn [haskell-result]
(.flatMap
(google-query scheduler "clojure")
(rxi/fn [clojure-result]
(Observable/just [haskell-result clojure-result]))))))
;; Using monadic API
(defn both-results-m [scheduler]
(monadic/do observable-m
[haskell (google-query scheduler "haskell")
clojure (google-query scheduler "clojure")]
[haskell "\n=======\n" clojure]))
;; Main function
(defn run-example []
(let [app-state (new-app-state)
scheduler (:http-scheduler @app-state)
bg-observable (both-results-m scheduler)
result (empty-mvar)]
(rx/subscribe
bg-observable
(fn my-on-next [body]
;; this is executed on a different thread
;; so we are going to sync with main thread
;; via MVar
(put-mvar result body))
(fn my-on-error [err]
(put-mvar result err))
(fn my-on-complete [] (println "DONE!")))
;; this will unblock when the MVar is filled
(take-mvar result)))
;; Pending: Observable failure and how to handle
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment