Last active
August 29, 2015 14:03
-
-
Save nkpart/5c83011b10b33ebc5e35 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE RankNTypes #-} | |
module Main where | |
import Control.Monad as M | |
import Control.Applicative ((<$>)) | |
import Pipes as P | |
import Pipes.Prelude as P | |
import Pipes.Concurrent as P | |
import Control.Concurrent.Async as A | |
import Control.Monad | |
-- | This function works. | |
-- It explicitly seals the second set of channels when the workers have written all their data. | |
forkJoin threads generator mapper = do | |
(output, input) <- liftIO $ spawn Unbounded | |
(outputPeople, inputPeople, seal2) <- liftIO $ spawn' Unbounded | |
liftIO . async $ do | |
runEffect $ generator >-> toOutput output | |
performGC | |
vs <- forM [1..threads] $ \i -> liftIO . A.async $ do | |
runEffect $ for (fromInput input) mapper >-> (toOutput outputPeople) | |
performGC | |
liftIO . async $ do | |
mapM_ wait vs | |
atomically seal2 | |
fromInput inputPeople | |
-- | This function fails with `thread blocked indefinitely in an STM transaction` most of the time. | |
-- The difference is that this function doesn't explicitly seal the second set of channels. | |
forkJoin2 threads generator mapper = do | |
(output, input) <- liftIO $ spawn Unbounded | |
(outputPeople, inputPeople) <- liftIO $ spawn Unbounded | |
liftIO . async $ do | |
runEffect $ generator >-> toOutput output | |
performGC | |
_ <- forM [1..threads] $ \i -> liftIO . A.async $ do | |
runEffect $ for (fromInput input) mapper >-> (toOutput outputPeople) | |
performGC | |
fromInput inputPeople | |
main :: IO () | |
main = do | |
let reducer = P.fold (++) [] id | |
Prelude.print 1 | |
M.replicateM_ 10 $ do | |
Prelude.print =<< reducer (forkJoin 10 (P.each [1..10]) (yield . Prelude.show)) | |
Prelude.print 2 | |
M.replicateM_ 10 $ do | |
Prelude.print =<< reducer (forkJoin2 10 (P.each [1..10]) (yield . Prelude.show)) | |
putStrLn "Done." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment