Skip to content

Instantly share code, notes, and snippets.

@snoyberg
Created March 12, 2014 04:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save snoyberg/9500916 to your computer and use it in GitHub Desktop.
Save snoyberg/9500916 to your computer and use it in GitHub Desktop.
{-# LANGUAGE FlexibleContexts #-}
import Control.Monad
import Control.Exception
import Control.Concurrent hiding (yield)
import Control.Monad.Trans.Control (control, MonadBaseControl)
import qualified Control.Concurrent.Lifted as L
import Pipes
import Pipes.Concurrent
merge :: (MonadIO m, MonadBaseControl IO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
(output, input) <- liftIO $ spawn Unbounded
lift $ mapM_ (fork output) producers
fromInput input
fork :: (MonadBaseControl IO m, MonadIO m) => Output a -> Producer a m () -> m ThreadId
fork output producer = L.fork $ do
runEffect $ producer >-> toOutput output
liftIO performGC
main = do
let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
_ <- runEffect $ producer >-> taker 20
return ()
where repeater :: Int -> Int -> Producer Int IO r
repeater val delay = forever $ do
lift $ threadDelay delay
yield val
taker :: Int -> Consumer Int IO ()
taker 0 = return ()
taker n = do
val <- await
liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
taker $ n - 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment