Skip to content

Instantly share code, notes, and snippets.

@mithrandi
Last active August 29, 2015 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mithrandi/fdeeb25cb05c846eceab to your computer and use it in GitHub Desktop.
Save mithrandi/fdeeb25cb05c846eceab to your computer and use it in GitHub Desktop.
Version of map handling deferreds/errors
(ns clj-amp.core
(:require [manifold.deferred :as d]
[manifold.stream :as s]))
(defn- map'
"Like manifold.stream/map, except handles deferreds and closes the stream if an error occurs."
([f s]
(let [s' (s/stream)]
(s/connect-via
s
(fn [msg]
(-> msg
(d/chain
f
#(s/put! s' %))
(d/catch
(fn [e]
(s/close! s')
e))))
s'
{:description {:op "map"}})
(s/source-only s')))
([f s & rest]
(map' #(apply f %)
(apply s/zip s rest))))
@mithrandi
Copy link
Author

Pasted an old buggy version of the code, updated with the proper error-close logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment