Created
December 12, 2012 07:49
-
-
Save zsol/4265895 to your computer and use it in GitHub Desktop.
unbounded queue based communication between producer and several consumers in haskell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- tst | |
import Control.Concurrent.Chan | |
import Control.Concurrent | |
import Control.Exception (finally) | |
import System.IO.Unsafe (unsafePerformIO) | |
type Result = Int | |
execute_pig::String-> IO Result | |
execute_pig str = do | |
print $ "start " ++ str | |
--threadDelay $ 1000 * 200 | |
print $ "end " ++ str | |
return $ length str | |
processor::Chan String->Chan Result->String->IO () | |
processor input output id = do | |
msgs <- getChanContents input | |
mapM_ process msgs | |
where | |
process msg = do | |
log $ " executing " ++ msg | |
res <- execute_pig msg | |
log $ " done " ++ msg ++ " sending as result " ++ show res | |
writeChan output res | |
log $ " sent " ++ msg ++ " " ++ show res | |
log = print . (++) id | |
resultHandler::Chan Result->IO () | |
resultHandler results = do | |
r <- readChan results | |
print $ "result" ++ (show r) | |
resultHandler results | |
children :: MVar [MVar ()] | |
children = unsafePerformIO (newMVar []) | |
waitForChildren :: IO () | |
waitForChildren = do | |
cs <- takeMVar children | |
case cs of | |
[] -> return () | |
m:ms -> do | |
putMVar children ms | |
takeMVar m | |
waitForChildren | |
forkChild :: IO () -> IO ThreadId | |
forkChild io = do | |
mvar <- newEmptyMVar | |
childs <- takeMVar children | |
putMVar children (mvar:childs) | |
forkIO (io `finally` putMVar mvar ()) | |
main = do | |
tasks <- newChan | |
results <- newChan | |
mapM_ (forkChild . processor tasks results) ["first", "second", "third", "fourth", "fifth"] | |
result <- forkChild (resultHandler results) | |
print "all process started" | |
writeChan tasks "a" | |
writeChan tasks "aa" | |
writeChan tasks "aaa" | |
writeChan tasks "aaaa" | |
waitForChildren | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment