Skip to content

Instantly share code, notes, and snippets.

@leonoel
Last active October 30, 2022 01:25
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leonoel/4b7e8c26ffdbfb4e4ffcb79aaf9dccfa to your computer and use it in GitHub Desktop.
Save leonoel/4b7e8c26ffdbfb4e4ffcb79aaf9dccfa to your computer and use it in GitHub Desktop.
Parallel processing
;; Experiment - parallel processing using missionary primitives.
;; Inspired by Rx's parallel 'rails' :
;; http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/parallel/ParallelFlowable.html
;; https://dzone.com/articles/rxjava-idiomatic-concurrency-flatmap-vs-parallel
(ns parallel-processing
(:require [missionary.core :as m]))
(defn map-task [f >x]
(m/ap (m/? (f (m/?> >x)))))
(defmacro do-after [& body]
`(fn [rf#]
(fn ([r#] ~@body (rf# r#))
([r# x#] (rf# r# x#)))))
(defn poll [t]
(m/ap (m/? (m/?> (m/seed (repeat t))))))
(defn parallel
"Process values of flow >x with n parallel instances of pipeline f."
[n f >x]
(assert (pos? n))
(m/ap
(let [r (m/rdv) d (m/dfv) c (m/race r d)]
(m/?> (m/?= (m/seed (cons (->> >x
(map-task r)
(m/eduction (filter {}) (do-after (d c))))
(repeat n
(->> (poll c)
(m/eduction (take-while (complement #{c})))
(f))))))))))
;; returns 499500 in 2 seconds
(m/? (->> (m/seed (range 1000))
(parallel 50 #(m/ap (m/? (m/sleep 100 (m/?> %)))))
(m/reduce +)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment