Last active
November 22, 2016 16:35
-
-
Save parsonsmatt/9862427d05c4c633ce6aafb00eb77f0a to your computer and use it in GitHub Desktop.
Multithreaded job system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Jerbs where | |
import Control.Concurrent (forkIO, killThread) | |
import Control.Concurrent.STM (TQueue, atomically, newTQueueIO, | |
readTQueue, writeTQueue) | |
import Control.Exception (SomeException (..), try) | |
import Control.Monad (forever) | |
import Data.Foldable (for_) | |
import Data.Traversable (for) | |
runJobs :: Foldable f => f a -> Int -> (a -> IO b) -> IO (TQueue b) | |
runJobs things jobs action = do | |
thingQueue <- newTQueueIO | |
resultQueue <- newTQueueIO | |
finishedQueue <- newTQueueIO | |
-- start the worker threads | |
threads <- for [1..jobs] $ \_ -> forkIO . forever $ do | |
thing <- atomically $ readTQueue thingQueue | |
eresult <- try $ action thing | |
case eresult of | |
Left err@(SomeException _) -> print err | |
Right a -> atomically $ writeTQueue resultQueue a | |
atomically $ writeTQueue finishedQueue () | |
-- feed the queue | |
for_ things (atomically . writeTQueue thingQueue) | |
-- fork a thread to wait for all things to finish, then kill the | |
-- threads | |
forkIO $ do | |
for_ [1..jobs] $ \_ -> atomically $ readTQueue finishedQueue | |
for_ threads killThread | |
-- return the queue of results | |
pure resultQueue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment