Skip to content

Instantly share code, notes, and snippets.

@nkpart
Last active August 29, 2015 14:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nkpart/5c83011b10b33ebc5e35 to your computer and use it in GitHub Desktop.
Save nkpart/5c83011b10b33ebc5e35 to your computer and use it in GitHub Desktop.
{-# LANGUAGE RankNTypes #-}
module Main where
import Control.Monad as M
import Control.Applicative ((<$>))
import Pipes as P
import Pipes.Prelude as P
import Pipes.Concurrent as P
import Control.Concurrent.Async as A
import Control.Monad
-- | This function works.
-- It explicitly seals the second set of channels when the workers have written all their data.
forkJoin threads generator mapper = do
(output, input) <- liftIO $ spawn Unbounded
(outputPeople, inputPeople, seal2) <- liftIO $ spawn' Unbounded
liftIO . async $ do
runEffect $ generator >-> toOutput output
performGC
vs <- forM [1..threads] $ \i -> liftIO . A.async $ do
runEffect $ for (fromInput input) mapper >-> (toOutput outputPeople)
performGC
liftIO . async $ do
mapM_ wait vs
atomically seal2
fromInput inputPeople
-- | This function fails with `thread blocked indefinitely in an STM transaction` most of the time.
-- The difference is that this function doesn't explicitly seal the second set of channels.
forkJoin2 threads generator mapper = do
(output, input) <- liftIO $ spawn Unbounded
(outputPeople, inputPeople) <- liftIO $ spawn Unbounded
liftIO . async $ do
runEffect $ generator >-> toOutput output
performGC
_ <- forM [1..threads] $ \i -> liftIO . A.async $ do
runEffect $ for (fromInput input) mapper >-> (toOutput outputPeople)
performGC
fromInput inputPeople
main :: IO ()
main = do
let reducer = P.fold (++) [] id
Prelude.print 1
M.replicateM_ 10 $ do
Prelude.print =<< reducer (forkJoin 10 (P.each [1..10]) (yield . Prelude.show))
Prelude.print 2
M.replicateM_ 10 $ do
Prelude.print =<< reducer (forkJoin2 10 (P.each [1..10]) (yield . Prelude.show))
putStrLn "Done."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment