Skip to content

Instantly share code, notes, and snippets.

@dgendill
Last active March 11, 2017 03:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgendill/a943d277b818cf45cd8e62376c2eb0b8 to your computer and use it in GitHub Desktop.
Save dgendill/a943d277b818cf45cd8e62376c2eb0b8 to your computer and use it in GitHub Desktop.
Multiple Aff Consumers Connected to a Producer

Connecting many Coroutine Consumers to a Producer

The purescript-coroutines library provides some base functions for working with coroutines. Using joinMany, you can join multiple Consumers so they can listen to a Producer.

Some open questions that I'm still working on...

  1. Would you ever need to join this many consumers? Is this a design pattern you should entertain?
  2. Do sequential actions in the Aff monad naturally lead to "RangeError: Maximum call stack size exceeded" and is there a way to avoid it?
module MultipleConsumerCoroutine where
import Prelude
import Data.List as List
import Data.List.NonEmpty as NEList
import Control.Coroutine (Consumer, Producer, Transformer, await, connect, emit, joinConsumers, runProcess, transform, transformConsumer)
import Control.Monad.Aff (Aff, launchAff)
import Control.Monad.Aff.Console (log, CONSOLE)
import Control.Monad.Eff (Eff)
import Control.Monad.Eff.Exception (EXCEPTION)
import Control.Monad.Rec.Class (class MonadRec, Step(Done, Loop), forever, tailRecM2)
import Control.Monad.Trans.Class (lift)
import Control.Parallel (class Parallel)
import Data.Array (range)
import Data.List.Types (NonEmptyList)
import Data.Maybe (Maybe(Just, Nothing))
import Data.Tuple (Tuple(..))
main :: Eff (err :: EXCEPTION, console :: CONSOLE) Unit
main = void $ launchAff $ do
-- -- Note, if you increase the number of consumers to be joined, you will get a
-- -- RangeError: Maximum call stack size exceeded error. I have yet to find the
-- -- cause of this or the way to avoid this.
-- let consumers' = ((map >>> map) (const logger) (NEList.fromFoldable (range 0 1000)))
let consumerList = ((map >>> map) (\i -> consumeLT i) (NEList.fromFoldable (range 0 10)))
case consumerList of
Nothing -> pure unit
Just consumers' -> do
consumers <- (joinMany consumers')
runProcess (connect
numberProducer
consumers
)
tupleTransform :: forall m a. (Monad m) => Transformer a (Tuple a a) m Unit
tupleTransform = transform (\i -> Tuple i i)
joinMany :: forall a m par. (Monad m, MonadRec m, Parallel par m) => NonEmptyList (Consumer a m Unit) -> m (Consumer a m Unit)
joinMany ac = tailRecM2 go (NEList.head ac) (NEList.tail ac)
where
go h t = case (List.head t) of
(Just thead) -> do
let hh = h
let newConsumer = (transformConsumer (forever $ tupleTransform) (joinConsumers hh thead))
case (List.tail t) of
(Just ttail) -> do
pure (Loop { a : newConsumer, b : ttail })
_ -> pure $ Done newConsumer
_ -> pure $ Done h
consumeLT :: forall e a. (Ord a, Show a) => a -> Consumer a (Aff (console :: CONSOLE | e)) Unit
consumeLT max = forever $ do
s <- await
case s < max of
true -> do
lift $ log $ (show s) <> " LT " <> (show max)
false -> pure unit
numberProducer :: forall m. (Monad m) => Producer Int m Unit
numberProducer = go 0
where
go i = do
emit i
go (i + 1)
0 LT 1
0 LT 2
0 LT 3
0 LT 4
0 LT 5
0 LT 6
0 LT 7
0 LT 8
0 LT 9
0 LT 10
1 LT 2
1 LT 3
1 LT 4
1 LT 5
1 LT 6
1 LT 7
1 LT 8
1 LT 9
1 LT 10
2 LT 3
2 LT 4
2 LT 5
2 LT 6
2 LT 7
2 LT 8
2 LT 9
2 LT 10
3 LT 4
3 LT 5
3 LT 6
3 LT 7
3 LT 8
3 LT 9
3 LT 10
4 LT 5
4 LT 6
4 LT 7
4 LT 8
4 LT 9
4 LT 10
5 LT 6
5 LT 7
5 LT 8
5 LT 9
5 LT 10
6 LT 7
6 LT 8
6 LT 9
6 LT 10
7 LT 8
7 LT 9
7 LT 10
8 LT 9
8 LT 10
9 LT 10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment