Skip to content

Instantly share code, notes, and snippets.

@ericnormand
Created November 30, 2020 15:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ericnormand/df464178775bf80ce3214bfde707396f to your computer and use it in GitHub Desktop.
Save ericnormand/df464178775bf80ce3214bfde707396f to your computer and use it in GitHub Desktop.
405 - PurelyFunctional.tv Newsletter

Sequence and parallel combinators

I've been re-reading the Lambda, the ultimate... papers again for my podcast. These papers show how functions and function calls can model many of the imperative constructs in programming languages. One practical application of that is to build a compiler whose main construct is the function call, which is a common way to implement Scheme.

In this challenge, we'd like to make constructs for two kinds of execution: sequential and parallel.

Imagine we had actions a and b, which are functions which depend on when they are called. If we want them to run in order, meaning a completes before b begins, we can create a new action a>b, like so:

(def a>b (sequential a b))

We can then call (a>b) to run them in the right order.

If we want them to run in parallel, we can similarly write:

(def a-b (parallel a b))

Then when we call (a-b), they will run "at the same time".

Your task is to write sequential and parallel, using threads (or some other construct) if needed.

Bonus: devise a way to have return values that you can block on. sequential should act like do and return the return value of the last argument. parallel should return both return values.

Please submit your solutions as comments on this gist.

@cassc
Copy link

cassc commented Dec 2, 2020

pmap and friends limit the number of threads, but using future directly seems to give the desired behaviour:

You are right, I believe that's caused by the fact that pmap / pcalls both partitions the input sequence into batches with size equals the number of CPU cores plus 2, tasks won't run until the previous batch is consumed.

@KingCode
Copy link

KingCode commented Dec 3, 2020

Since my solutions are all blocking by default, I added as a "reverse bonus" an asynchronous wrapper, but maybe I missed it altogether?

(defn sequential 
  ([a b]
   (fn [& xs]
     (do (apply a xs)
         (apply b xs))))
  ([a b & more]
   (if (empty? more) 
     (sequential a b)
     (sequential a (apply sequential b more)))))

(require '[clojure.core.reducers :refer [fold] :as r])

(defn parallel 
  ([a b]
   (fn [& xs]
     (->> [a b]
          (map #(future (apply % xs)))
          (mapv deref))))
  ([a b & more]
   (->> (list* a b more)
        (fold (fn 
                ([] nil)
                ([pf1 pf2] 
                 (if (nil? pf1)
                   pf2
                   (fn [& xs]
                     (->> xs
                          (apply (parallel pf1 pf2))
                          flatten vec)))))))))

;; Test harness for sequential accuracy test
(defn ->sequential-results-ordered
  "Transforms each of 'fns into a fn which appends its result to a vector
   of previous invocation results, and returns a sequential of the new 
  functions. The return function's return value is the result of each of fns's 
  application by call order. an easy way of verifying a sequential's 
  component functions' call order.
  "
[& fns] 
  (let [a (atom [])
        app (->> fns butlast 
                 (mapv (fn [f] #(swap! a conj (apply f %&))))
                 ((fn [fs]  (conj fs #(conj @a (apply (last fns) %&)))))
                 (apply sequential))]
    (fn [& xs]
      (apply app xs))))

;; accuracy tests
(assert (= [9, 24]
           ((->sequential-results-ordered + *) 2 3 4)))

(assert (= [9, 24, -5]
           ((->sequential-results-ordered + * -) 2 3 4)))

(assert (= [9, 24, -5, 1/6]
           ((->sequential-results-ordered + * - /) 2 3 4)))

(assert (= [9, 24] 
           ((parallel + *) 2 3 4)))

(assert (= [9, 24, -5] 
           ((parallel + * -) 2 3 4)))

(assert (= [9, 24, -5, 1/6] 
           ((parallel + * - /) 2 3 4))) 

;; concurrent/parallel vs sequential comparison 
(require '[criterium.core :refer [quick-bench]])

;; test harness 
(defn make-slow [sleeptime combinator & fns]
  (->> fns
       (mapv #(fn [& xs] (Thread/sleep sleeptime) 
                (apply % xs)))
       (apply combinator)))

(def slow-sequential (make-slow 500 sequential + * -))
(def slow-parallel (make-slow 500 parallel + * -))

(quick-bench (slow-sequential 2 3 4 5))

;; Evaluation count : 6 in 6 samples of 1 calls.
             ;; Execution time mean : 1.501755 sec
    ;; Execution time std-deviation : 1.101675 ms
   ;; Execution time lower quantile : 1.500706 sec ( 2.5%)
   ;; Execution time upper quantile : 1.503036 sec (97.5%)
                   ;; Overhead used : 4.400701 ns

(quick-bench (slow-parallel 2 3 4 5))

;; Evaluation count : 6 in 6 samples of 1 calls.
             ;; Execution time mean : 501.092740 ms
    ;; Execution time std-deviation : 361.796937 µs
   ;; Execution time lower quantile : 500.521119 ms ( 2.5%)
   ;; Execution time upper quantile : 501.412429 ms (97.5%)
                   ;; Overhead used : 4.400701 ns

;; bonus: make anything (including a combinator) asynchronous
(defn async [f]
  (fn [& xs]
    (future (apply f xs))))

(assert (= [9 24] @((async (->sequential-results-ordered + *)) 2 3 4)))

(time ((async (make-slow 100000000 sequential + *)) 2 3 4))
;; "Elapsed time: 0.089478 msecs"
;; #<Future@669bfd44: :pending>

(time ((async (make-slow 100000000  parallel + *)) 3 4 5))
;; "Elapsed time: 0.059211 msecs"
;; #<Future@6e0eeeab: :pending>

(EDITS: added varargs version & tests for sequential and concurrenty comparison tests, then asynchronous wrapper)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment