Skip to content

Instantly share code, notes, and snippets.

@NicolasT
Created November 28, 2012 19:16
Show Gist options
  • Save NicolasT/4163407 to your computer and use it in GitHub Desktop.
Save NicolasT/4163407 to your computer and use it in GitHub Desktop.
Haskell worker threadpool using STM
{-# LANGUAGE CPP, FlexibleContexts, BangPatterns #-}
module Control.Concurrent.ThreadPool (
createPool
, destroyPool
, withPool
, pushWork
, popResult
, popResult'
, hasPendingWork
) where
import Control.Applicative
import Control.Exception.Base (SomeException)
import Control.Exception.Lifted (bracket, try)
import Control.Monad (replicateM, replicateM_)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Concurrent.Lifted
import Control.Concurrent.STM
#ifdef DEBUG
import System.IO (hPutStrLn, stderr)
#endif
type ThreadCount = Int
type QueueSize = Maybe Int
data Command c i = Execute !c !i
| Stop
type Result c o = (c, Either SomeException o)
type CommandQueue c i = Queue (Command c i)
type ReplyQueue c o = Queue (Result c o)
type Processor i m o = i -> m o
data ThreadPool c i o = ThreadPool { tpPending :: TVar Int
, tpThreads :: [ThreadId]
, tpChanIn :: CommandQueue c i
, tpChanOut :: ReplyQueue c o
}
data Queue a = Bounded (TBQueue a)
| Unbounded (TQueue a)
newQueueIO :: QueueSize -> IO (Queue a)
newQueueIO l = case l of
Nothing -> Unbounded <$> newTQueueIO
Just l' -> Bounded <$> newTBQueueIO l'
{-# INLINE newQueueIO #-}
writeQueue :: Queue a -> a -> STM ()
writeQueue q = case q of
Bounded q' -> writeTBQueue q'
Unbounded q' -> writeTQueue q'
{-# INLINE writeQueue #-}
readQueue :: Queue a -> STM a
readQueue q = case q of
Bounded q' -> readTBQueue q'
Unbounded q' -> readTQueue q'
{-# INLINE readQueue #-}
tryReadQueue :: Queue a -> STM (Maybe a)
tryReadQueue q = case q of
Bounded q' -> tryReadTBQueue q'
Unbounded q' -> tryReadTQueue q'
{-# INLINE tryReadQueue #-}
createPool :: (MonadIO m, MonadBaseControl IO m) => ThreadCount
-> QueueSize
-> QueueSize
-> Processor i m o
-> m (ThreadPool c i o)
createPool count commandQueueSize replyQueueSize handler = do
pending <- liftIO $ newTVarIO 0
chanIn <- liftIO $ newQueueIO commandQueueSize
chanOut <- liftIO $ newQueueIO replyQueueSize
threads <- replicateM count $ fork $ worker handler chanIn chanOut pending
return ThreadPool { tpPending = pending
, tpThreads = threads
, tpChanIn = chanIn
, tpChanOut = chanOut
}
{-# SPECIALIZE createPool :: ThreadCount
-> QueueSize
-> QueueSize
-> Processor i IO o
-> IO (ThreadPool c i o) #-}
atomically' :: MonadIO m => STM a -> m a
atomically' = liftIO . atomically
{-# INLINE atomically' #-}
destroyPool :: MonadIO m => ThreadPool c i o -> m ()
destroyPool pool =
atomically' $ replicateM_ (length $ tpThreads pool) $ writeQueue (tpChanIn pool) Stop
{-# SPECIALIZE destroyPool :: ThreadPool c i o -> IO () #-}
pushWork :: MonadIO m => ThreadPool c i o -> c -> i -> m ()
pushWork pool !c !i = atomically' $ do
writeQueue (tpChanIn pool) (Execute c i)
modifyTVar' (tpPending pool) succ
{-# SPECIALIZE pushWork :: ThreadPool c i o -> c -> i -> IO () #-}
popResult :: MonadIO m => ThreadPool c i o -> m (Result c o)
popResult pool = atomically' $ readQueue (tpChanOut pool)
{-# SPECIALIZE popResult :: ThreadPool c i o -> IO (Result c o) #-}
popResult' :: MonadIO m => ThreadPool c i o -> m (Maybe (Result c o))
popResult' pool = atomically' $ tryReadQueue (tpChanOut pool)
{-# SPECIALIZE popResult' :: ThreadPool c i o -> IO (Maybe (Result c o)) #-}
-- This is... no good (for now)
hasPendingWork :: MonadIO m => ThreadPool c i o -> m Bool
hasPendingWork pool = atomically' $ (/= 0) <$> readTVar (tpPending pool)
worker :: (MonadIO m, MonadBaseControl IO m) => Processor i m o
-> CommandQueue c i
-> ReplyQueue c o
-> TVar Int
-> m ()
worker handler chanIn chanOut pending = loop
where
loop = do
debug "Awaiting work"
req <- atomically' $ readQueue chanIn
case req of
Execute c i -> do
debug "Executing command"
r <- try $! do
res <- handler i
return $! res
atomically' $ do
writeQueue chanOut (c, r)
modifyTVar' pending pred
loop
Stop -> do
debug "Shutdown"
return ()
{-# SPECIALIZE worker :: Processor i IO o -> CommandQueue c i -> ReplyQueue c o -> TVar Int -> IO () #-}
withPool :: (MonadIO m, MonadBaseControl IO m) => ThreadCount
-> QueueSize
-> QueueSize
-> Processor i m o
-> (ThreadPool c i o -> m a)
-> m a
withPool count commandQueueSize replyQueueSize handler =
bracket
(createPool count commandQueueSize replyQueueSize handler)
destroyPool
{-# SPECIALIZE withPool :: ThreadCount
-> QueueSize
-> QueueSize
-> Processor i IO o
-> (ThreadPool c i o -> IO a)
-> IO a #-}
debug :: MonadIO m => String -> m ()
#ifndef DEBUG
debug _ = return ()
#else
debug s = liftIO $ do
tid <- myThreadId
hPutStrLn stderr $ "[" ++ show tid ++ "] " ++ s
#endif
{-# INLINE debug #-}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment