Skip to content

Instantly share code, notes, and snippets.

@parsonsmatt
Last active November 22, 2016 16:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save parsonsmatt/9862427d05c4c633ce6aafb00eb77f0a to your computer and use it in GitHub Desktop.
Save parsonsmatt/9862427d05c4c633ce6aafb00eb77f0a to your computer and use it in GitHub Desktop.
Multithreaded job system
module Jerbs where
import Control.Concurrent (forkIO, killThread)
import Control.Concurrent.STM (TQueue, atomically, newTQueueIO,
readTQueue, writeTQueue)
import Control.Exception (SomeException (..), try)
import Control.Monad (forever)
import Data.Foldable (for_)
import Data.Traversable (for)
runJobs :: Foldable f => f a -> Int -> (a -> IO b) -> IO (TQueue b)
runJobs things jobs action = do
thingQueue <- newTQueueIO
resultQueue <- newTQueueIO
finishedQueue <- newTQueueIO
-- start the worker threads
threads <- for [1..jobs] $ \_ -> forkIO . forever $ do
thing <- atomically $ readTQueue thingQueue
eresult <- try $ action thing
case eresult of
Left err@(SomeException _) -> print err
Right a -> atomically $ writeTQueue resultQueue a
atomically $ writeTQueue finishedQueue ()
-- feed the queue
for_ things (atomically . writeTQueue thingQueue)
-- fork a thread to wait for all things to finish, then kill the
-- threads
forkIO $ do
for_ [1..jobs] $ \_ -> atomically $ readTQueue finishedQueue
for_ threads killThread
-- return the queue of results
pure resultQueue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment