Skip to content

@snoyberg /concurrent.hs
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
{-# LANGUAGE FlexibleContexts #-}
import Control.Monad
import Control.Exception
import Control.Concurrent hiding (yield)
import Control.Monad.Trans.Control (control, MonadBaseControl)
import qualified Control.Concurrent.Lifted as L
import Pipes
import Pipes.Concurrent
merge :: (MonadIO m, MonadBaseControl IO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
(output, input) <- liftIO $ spawn Unbounded
lift $ mapM_ (fork output) producers
fromInput input
fork :: (MonadBaseControl IO m, MonadIO m) => Output a -> Producer a m () -> m ThreadId
fork output producer = L.fork $ do
runEffect $ producer >-> toOutput output
liftIO performGC
main = do
let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
_ <- runEffect $ producer >-> taker 20
return ()
where repeater :: Int -> Int -> Producer Int IO r
repeater val delay = forever $ do
lift $ threadDelay delay
yield val
taker :: Int -> Consumer Int IO ()
taker 0 = return ()
taker n = do
val <- await
liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
taker $ n - 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.