Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
405 - PurelyFunctional.tv Newsletter

Sequence and parallel combinators

I've been re-reading the Lambda, the ultimate... papers again for my podcast. These papers show how functions and function calls can model many of the imperative constructs in programming languages. One practical application of that is to build a compiler whose main construct is the function call, which is a common way to implement Scheme.

In this challenge, we'd like to make constructs for two kinds of execution: sequential and parallel.

Imagine we had actions a and b, which are functions which depend on when they are called. If we want them to run in order, meaning a completes before b begins, we can create a new action a>b, like so:

(def a>b (sequential a b))

We can then call (a>b) to run them in the right order.

If we want them to run in parallel, we can similarly write:

(def a-b (parallel a b))

Then when we call (a-b), they will run "at the same time".

Your task is to write sequential and parallel, using threads (or some other construct) if needed.

Bonus: devise a way to have return values that you can block on. sequential should act like do and return the return value of the last argument. parallel should return both return values.

Please submit your solutions as comments on this gist.

@cassc
Copy link

cassc commented Dec 1, 2020

(defn sequential [func & args]
  (let [[nf & more] args]
    (if nf
      (do
        (func)
        (recur nf more))
      (func))))

;; this is semi-lazy, may or may not be what you want
(defn parallel [& args]
  (pmap #(%) args))

@steffan-westcott
Copy link

steffan-westcott commented Dec 1, 2020

pmap and friends limit the number of threads, but using future directly seems to give the desired behaviour:

(defn sequential [& thunks]
  (fn []
    (reduce #(%2) nil thunks)))

(defn parallel [& thunks]
  (fn []
    (->> thunks
      (mapv #(future (%)))  ; eagerly get futures, so all thunks start together
      (mapv deref))))       ; eagerly get results, so all thunks are completed before this function returns

Here is a demo showing 50 sequences starting and stopping together:

(defn wait [n]
  (sequential
    #(println "Starting sequence " n)
    #(Thread/sleep 3000)
    #(println "Finished sequence " n)))

(def fifty-seqs-in-parallel
  (apply parallel (map wait (range 50))))

(do
  (println "Starting all sequences")
  (fifty-seqs-in-parallel)
  (println "Finished all sequences"))

Note that the following version of parallel does not start all 50 sequences at the same time:

;; Does not start all thunks at same time, above a certain number
;; of thunks depending on the number of cores in your machine
(defn parallel [& thunks]
  #(doall (apply pcalls thunks)))

Edited: Simplified sequential definition
Edited: Refactored parallel to use (mapv ...) instead of (doall (map ...)) to force evaluation of sequence elements

@steffan-westcott
Copy link

steffan-westcott commented Dec 1, 2020

My answer is naive as it does not address exception handling, which seems to be a weakly supported area for async Clojure(Script) programming. I wonder if anyone here has experience using missionary? See also this announcement. I think this library deserves some more attention in the community.

@alex-gerdom
Copy link

alex-gerdom commented Dec 1, 2020

Struggled with this one, problem is slightly harder than it looks. Issue is that if we're ordering tasks like this, we really need to be thinking about side-effects. And if people haven't hit the issue yet: Laziness and side-effects do not mix well.

Worth thinking about:

  • If you're using pcalls, you're getting a lazy-seq of values. The calls aren't going to start until they're needed (except the first). At least conceptually you expect work for the calls to either be done when you deref the future, or the work to be ongoing and your program to block at that point (at least in cases I can think of). You need something like doall to force side-effects.
  • If you're doing something like a API request using pcalls, how does that play with chunking?
(defn sequential [& funcs]
  "Takes a set of functions and returns a fn that evaluates them sequentially.
   The returned function takes a variable number of arguments, and returns future for final value."
  (fn [& args]
    (let [bound-funcs (map #(partial apply % args) funcs)
          early-bound-funcs (butlast bound-funcs)
          last-bound-func (last bound-funcs)]
      (future
        ;; Force the evaluation of early steps sequentially for side effects (not retaining head)
        (dorun (map (fn [f] (f)) early-bound-funcs))
        ;; Force evaluation of final step. We only need this value.
        (last-bound-func)))))

(defn parallel [& funcs]
  "Takes a set of functions and returns a fn that evaluates them in parallel.
   The returned function takes a variable number of arguments, and returns future for list of values."
  (fn [& args]
    (let [bound-funcs (map #(partial apply % args) funcs)]
      (future
        (doall
         (apply pcalls bound-funcs))))))

;; Dummy Examples: Args to functions aren't used
;; Wait macro copied from: https://www.braveclojure.com/concurrency/#Promises
(defmacro wait
  "Sleep `timeout` seconds before evaluating body"
  [timeout & body]
  `(do (Thread/sleep ~timeout) ~@body))

(let [a #(wait (* 5 1000) 'a)
      b #(wait (* 5 1000) 'b)
      c #(wait (* 5 1000) 'c)
      a>b>c (sequential a b c)
      a>b>c-instance (a>b>c)] ;; We've started evaluation at this point
  (wait (* 5 1000))           ;; Continue work on main thread for 5 seconds
  (time @a>b>c-instance))     ;; Return 'c in about 10s

(let [a #(wait (* 10 1000) 'a)
      b #(wait (* 10 1000) 'b)
      c #(wait (* 10 1000) 'c)
      a-b-c (parallel a b c)
      a-b-c-instance (a-b-c)]    ;; We've started evaluation at this point
  (wait (* 5 1000)               ;; Continue work on main thread
        (time @a-b-c-instance))) ;; Return [(a) (b) (c)] after ~5s

@cassc
Copy link

cassc commented Dec 2, 2020

pmap and friends limit the number of threads, but using future directly seems to give the desired behaviour:

You are right, I believe that's caused by the fact that pmap / pcalls both partitions the input sequence into batches with size equals the number of CPU cores plus 2, tasks won't run until the previous batch is consumed.

@KingCode
Copy link

KingCode commented Dec 3, 2020

Since my solutions are all blocking by default, I added as a "reverse bonus" an asynchronous wrapper, but maybe I missed it altogether?

(defn sequential 
  ([a b]
   (fn [& xs]
     (do (apply a xs)
         (apply b xs))))
  ([a b & more]
   (if (empty? more) 
     (sequential a b)
     (sequential a (apply sequential b more)))))

(require '[clojure.core.reducers :refer [fold] :as r])

(defn parallel 
  ([a b]
   (fn [& xs]
     (->> [a b]
          (map #(future (apply % xs)))
          (mapv deref))))
  ([a b & more]
   (->> (list* a b more)
        (fold (fn 
                ([] nil)
                ([pf1 pf2] 
                 (if (nil? pf1)
                   pf2
                   (fn [& xs]
                     (->> xs
                          (apply (parallel pf1 pf2))
                          flatten vec)))))))))

;; Test harness for sequential accuracy test
(defn ->sequential-results-ordered
  "Transforms each of 'fns into a fn which appends its result to a vector
   of previous invocation results, and returns a sequential of the new 
  functions. The return function's return value is the result of each of fns's 
  application by call order. an easy way of verifying a sequential's 
  component functions' call order.
  "
[& fns] 
  (let [a (atom [])
        app (->> fns butlast 
                 (mapv (fn [f] #(swap! a conj (apply f %&))))
                 ((fn [fs]  (conj fs #(conj @a (apply (last fns) %&)))))
                 (apply sequential))]
    (fn [& xs]
      (apply app xs))))

;; accuracy tests
(assert (= [9, 24]
           ((->sequential-results-ordered + *) 2 3 4)))

(assert (= [9, 24, -5]
           ((->sequential-results-ordered + * -) 2 3 4)))

(assert (= [9, 24, -5, 1/6]
           ((->sequential-results-ordered + * - /) 2 3 4)))

(assert (= [9, 24] 
           ((parallel + *) 2 3 4)))

(assert (= [9, 24, -5] 
           ((parallel + * -) 2 3 4)))

(assert (= [9, 24, -5, 1/6] 
           ((parallel + * - /) 2 3 4))) 

;; concurrent/parallel vs sequential comparison 
(require '[criterium.core :refer [quick-bench]])

;; test harness 
(defn make-slow [sleeptime combinator & fns]
  (->> fns
       (mapv #(fn [& xs] (Thread/sleep sleeptime) 
                (apply % xs)))
       (apply combinator)))

(def slow-sequential (make-slow 500 sequential + * -))
(def slow-parallel (make-slow 500 parallel + * -))

(quick-bench (slow-sequential 2 3 4 5))

;; Evaluation count : 6 in 6 samples of 1 calls.
             ;; Execution time mean : 1.501755 sec
    ;; Execution time std-deviation : 1.101675 ms
   ;; Execution time lower quantile : 1.500706 sec ( 2.5%)
   ;; Execution time upper quantile : 1.503036 sec (97.5%)
                   ;; Overhead used : 4.400701 ns

(quick-bench (slow-parallel 2 3 4 5))

;; Evaluation count : 6 in 6 samples of 1 calls.
             ;; Execution time mean : 501.092740 ms
    ;; Execution time std-deviation : 361.796937 µs
   ;; Execution time lower quantile : 500.521119 ms ( 2.5%)
   ;; Execution time upper quantile : 501.412429 ms (97.5%)
                   ;; Overhead used : 4.400701 ns

;; bonus: make anything (including a combinator) asynchronous
(defn async [f]
  (fn [& xs]
    (future (apply f xs))))

(assert (= [9 24] @((async (->sequential-results-ordered + *)) 2 3 4)))

(time ((async (make-slow 100000000 sequential + *)) 2 3 4))
;; "Elapsed time: 0.089478 msecs"
;; #<Future@669bfd44: :pending>

(time ((async (make-slow 100000000  parallel + *)) 3 4 5))
;; "Elapsed time: 0.059211 msecs"
;; #<Future@6e0eeeab: :pending>

(EDITS: added varargs version & tests for sequential and concurrenty comparison tests, then asynchronous wrapper)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment