Skip to content

Instantly share code, notes, and snippets.

@parsonsmatt parsonsmatt/jobs.hs
Last active Nov 22, 2016

Embed
What would you like to do?
Multithreaded job system
module Jerbs where
import Control.Concurrent (forkIO, killThread)
import Control.Concurrent.STM (TQueue, atomically, newTQueueIO,
readTQueue, writeTQueue)
import Control.Exception (SomeException (..), try)
import Control.Monad (forever)
import Data.Foldable (for_)
import Data.Traversable (for)
runJobs :: Foldable f => f a -> Int -> (a -> IO b) -> IO (TQueue b)
runJobs things jobs action = do
thingQueue <- newTQueueIO
resultQueue <- newTQueueIO
finishedQueue <- newTQueueIO
-- start the worker threads
threads <- for [1..jobs] $ \_ -> forkIO . forever $ do
thing <- atomically $ readTQueue thingQueue
eresult <- try $ action thing
case eresult of
Left err@(SomeException _) -> print err
Right a -> atomically $ writeTQueue resultQueue a
atomically $ writeTQueue finishedQueue ()
-- feed the queue
for_ things (atomically . writeTQueue thingQueue)
-- fork a thread to wait for all things to finish, then kill the
-- threads
forkIO $ do
for_ [1..jobs] $ \_ -> atomically $ readTQueue finishedQueue
for_ threads killThread
-- return the queue of results
pure resultQueue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.