Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Manifold examples
(ns hello-manifold.core
(:require
[aleph.http :as http]
[byte-streams :as bs]
[cheshire.core :as json]
[clojure.string :as str]
[manifold.deferred :as d]
[manifold.executor :as e]
[manifold.stream :as s]))
;;; ROADMAP
;; -
;; Utils (ignore me plz)
(defn where-am-i?
"Returns the friendly name of the current java thread."
[]
(.getName (Thread/currentThread)))
(let [safe-printer (agent nil)]
(defn log
"Does a println but also prints the current thread."
[& args]
(let [thread (where-am-i?)]
(send safe-printer
(fn [_]
(println (str/join " " (list*
(str "[" (mod (System/currentTimeMillis) 100000) "]")
(str "[" thread "]\t")
args)))))
nil)))
(defn inc*
"A fake expensive computation. Sleeps for 1 second, then increments input."
[x]
(log "inc'ing" x)
(Thread/sleep 1000)
(let [result (inc x)]
(log "=>" result)
result))
(comment
;;; PROMISES
;; A promise is a box that can be "realized" or not. API:
;; - (promise) - creates a promise
;; - (realized? p) - has it has been delivered?
;; - (deliver p value) - delivers a value, or does nothing if a value already exists
;; - (deref p) or @p - blocks the current thread until delivered, then returns that value
;;; FUTURES
;; A future is an asynchronous computation that can be manipulated like a promise. API:
;; - (future ...) - creates a future
;; - (realized? f) - has the computation completed?
;; - (deref p) or @p - blocks until the computation completes
;;; DEFERREDS
;; A deferred is a box with two slots: "success" and "error" state. Core API:
;; - (d/deferred) - creates a deferred
;; - (realized? d) - has it been realized with either error or success?
;; - (d/success! d val) - delivers a success value
;; - (d/error! d val) - delivers an error value
;; - (deref d) - blocks until success value (and returns it) or error value (and throws it)
;; - (d/on-realized d success-callback error-callback) - sets callbacks (super useful!)
;;; DEFERRED FUTURES
;; A shortcut to creating an async computation that dumps into a deferred.
;; - (d/future ...)
;; - Automatic error handling
;; - Runs on an unbounded thread pool
;; How to chain computations together?
;; Naive way (the only way you can do it with futures)
(as-> (future 1) f
(future (inc* @f))
(future (inc* @f))
(future (inc* @f))
(future (inc* @f))
(log "=>" @f))
;; deferred's true callbacks unlock the more powerful `d/chain`
(as-> (d/future (inc* 1)) f
(d/chain f inc* inc* inc* inc*)
(log "=>" @f))
(comment ;; Diagram for the above `d/chain` call:
D1 = 1 = << 1 >>
D2 = (inc D1) = << 2 >>
D3 = (inc D2) = << 3 >>
D4 = (inc D3) = << 4 >>
D5 = (inc D4) = << 5 >>)
;; return values to functions in `d/chain` may have as many additional
;; layers of deferredness as you want, which are automatically
;; swallowed into the overall deferred
(as-> (d/future (inc* 1)) f
(d/chain f (fn [x] (d/future (d/future (d/future (inc* x))))))
(log "=>" @f))
;; `d/chain` also magically treats non-deferred as deferred
(d/chain 1 inc*)
(comment
;; equivalent to
(d/chain (d/success-deferred 1) inc*)
)
;; What happens when an error is thrown in the middle of the chain?
(as-> (d/future 1) f
(d/chain f inc* inc* #(/ % 0) inc* inc*)
(log "=>" @f))
(comment
D1 = 1 = << 1 >>
D2 = (inc D1) = << 2 >>
D3 = (inc D2) = << 3 >>
D4 = (/ D3 0) = << ERROR division by zero >>
D5 = (inc D4) = << ERROR division by zero >>
D6 = (inc D5) = << ERROR division by zero >>)
;; Introducing `d/catch`
(as-> (d/future (Thread/sleep 100) 1) f
(d/chain f inc* inc* (fn [x] (/ x 0)) inc* inc*)
(d/catch f Exception (fn [e]
(log "Got exception" e)
:error))
(log "=>" @f))
(comment ;; Diagram for the above `d/chain` call:
D1 = 1 = << 1 , nil >>
D2 = (inc D1) = << 2 , nil >>
D3 = (inc D2) = << 3 , nil >>
D4 = (/ D3 0) = << nil , ERROR division by zero >>
D5 = (inc D4) = << nil , ERROR division by zero >>
D6 = (inc D5) = << nil , ERROR division by zero >>
D7 = (try D6
(catch Exception e
(log "Got exception" e)
:error))
= << :error , nil >>)
;; Introducing `d/zip`
(let [d (d/zip
(d/chain (d/onto (d/future (Thread/sleep 200) 0) fixed) inc* inc*)
(d/chain (d/onto (d/future (Thread/sleep 200) 10) fixed) inc* inc*)
(d/chain (d/onto (d/future (Thread/sleep 200) 100) fixed) inc* inc*))]
(log @d))
(comment
D1 = 0 = << 0 >>
D2 = (inc D1) = << 1 >>
D3 = (inc D2) = << 2 >>
D4 = 10 = << 10 >>
D5 = (inc D4) = << 11 >>
D6 = (inc D5) = << 12 >>
D7 = 100 = << 100 >>
D8 = (inc D7) = << 101 >>
D9 = (inc D8) = << 102 >>
D10 = [D3 D6 D9] = << (2 12 112) >>
;; Network
D1 → D2 → D3 ↘
D4 → D5 → D6 → D10
D7 → D8 → D9 ↗
)
)
;; More concrete example
(def base-url "http://jsonplaceholder.typicode.com")
(defn decode-body
[resp]
(-> resp
:body
slurp
(json/parse-string true)))
;; Not using manifold
(defn fetch
[relative-path]
(log "Fetching" relative-path)
(decode-body @(http/get (str base-url relative-path))))
(defn posts-for-user-expanded
[user-id]
(->> (fetch (str "/posts?userId=" user-id))
(mapv (fn [{:keys [id] :as post}]
(assoc post :comments
(->> (fetch (str "/posts/" id "/comments"))
(mapv (juxt :email :body))))))))
(defn all-users-expanded
[]
(->> (fetch "/users")
(mapv (fn [{:keys [id] :as user}]
(assoc user :posts
(posts-for-user-expanded id))))))
(comment
(time (puget.printer/pprint
(all-users-expanded))))
#_(def web-coordinator (e/fixed-thread-executor 2))
;; Using manifold and parallelism
(defn fetch-async
[relative-path]
(d/chain
(http/get (str base-url relative-path))
(fn [result] (log "Fetched" relative-path) result)
decode-body))
(defn posts-for-user-expanded-async
[user-id]
(d/chain
(fetch-async (str "/posts?userId=" user-id))
(fn [posts]
(apply d/zip
(map (fn [{:keys [id] :as post}]
(d/chain
(fetch-async (str "/posts/" id "/comments"))
(partial mapv (juxt :email :body))
(partial assoc post :comments)))
posts)))))
(defn all-users-expanded-async
[]
(d/chain
(fetch-async "/users")
(fn [users]
(apply d/zip
(map (fn [{:keys [id] :as user}]
(d/chain
(posts-for-user-expanded-async id)
(partial assoc user :posts)))
users)))))
;;; LET-FLOW
(comment
(d/let-flow [users (fetch-async "/users")
user (first users)
user-id (:id user)
posts (fetch-async (str "/posts?userId=" user-id))
post (first posts)
post-id (:id post)
comments (fetch-async (str "/posts/" post-id "/comments"))]
{:user user
:post post
:comments comments})
;; equivalent to
)
(defn stats-callback
[stats]
(prn stats))
(defn stats-callback2
[stats]#_
(prn "stats" stats))
(comment
(e/register-execute-pool-stats-callback #'stats-callback2)
(def fixed (e/fixed-thread-executor 5 {:stats-callback #'stats-callback})))
;;; TAKEAWAYS
;; - Most `d/*` functions, unless explicitly specified, do not create new threads,
;; they just pile work onto existing threads.
;; - So make sure you understand what thread something will run on!
;; - But when explicitly specified (`d/future`, `d/onto`, etc), you get control over
;; those threads / threadpools.
;; - Doesn't hurt to wrap stuff in `d/future` when in doubt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment