Skip to content

Instantly share code, notes, and snippets.

@zsol
Created December 12, 2012 07:49
Show Gist options
  • Save zsol/4265895 to your computer and use it in GitHub Desktop.
Save zsol/4265895 to your computer and use it in GitHub Desktop.
unbounded queue based communication between producer and several consumers in haskell
-- tst
import Control.Concurrent.Chan
import Control.Concurrent
import Control.Exception (finally)
import System.IO.Unsafe (unsafePerformIO)
type Result = Int
execute_pig::String-> IO Result
execute_pig str = do
print $ "start " ++ str
--threadDelay $ 1000 * 200
print $ "end " ++ str
return $ length str
processor::Chan String->Chan Result->String->IO ()
processor input output id = do
msgs <- getChanContents input
mapM_ process msgs
where
process msg = do
log $ " executing " ++ msg
res <- execute_pig msg
log $ " done " ++ msg ++ " sending as result " ++ show res
writeChan output res
log $ " sent " ++ msg ++ " " ++ show res
log = print . (++) id
resultHandler::Chan Result->IO ()
resultHandler results = do
r <- readChan results
print $ "result" ++ (show r)
resultHandler results
children :: MVar [MVar ()]
children = unsafePerformIO (newMVar [])
waitForChildren :: IO ()
waitForChildren = do
cs <- takeMVar children
case cs of
[] -> return ()
m:ms -> do
putMVar children ms
takeMVar m
waitForChildren
forkChild :: IO () -> IO ThreadId
forkChild io = do
mvar <- newEmptyMVar
childs <- takeMVar children
putMVar children (mvar:childs)
forkIO (io `finally` putMVar mvar ())
main = do
tasks <- newChan
results <- newChan
mapM_ (forkChild . processor tasks results) ["first", "second", "third", "fourth", "fifth"]
result <- forkChild (resultHandler results)
print "all process started"
writeChan tasks "a"
writeChan tasks "aa"
writeChan tasks "aaa"
writeChan tasks "aaaa"
waitForChildren
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment