Created
November 28, 2012 19:16
-
-
Save NicolasT/4163407 to your computer and use it in GitHub Desktop.
Haskell worker threadpool using STM
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE CPP, FlexibleContexts, BangPatterns #-} | |
module Control.Concurrent.ThreadPool ( | |
createPool | |
, destroyPool | |
, withPool | |
, pushWork | |
, popResult | |
, popResult' | |
, hasPendingWork | |
) where | |
import Control.Applicative | |
import Control.Exception.Base (SomeException) | |
import Control.Exception.Lifted (bracket, try) | |
import Control.Monad (replicateM, replicateM_) | |
import Control.Monad.IO.Class (MonadIO, liftIO) | |
import Control.Monad.Trans.Control (MonadBaseControl) | |
import Control.Concurrent.Lifted | |
import Control.Concurrent.STM | |
#ifdef DEBUG | |
import System.IO (hPutStrLn, stderr) | |
#endif | |
type ThreadCount = Int | |
type QueueSize = Maybe Int | |
data Command c i = Execute !c !i | |
| Stop | |
type Result c o = (c, Either SomeException o) | |
type CommandQueue c i = Queue (Command c i) | |
type ReplyQueue c o = Queue (Result c o) | |
type Processor i m o = i -> m o | |
data ThreadPool c i o = ThreadPool { tpPending :: TVar Int | |
, tpThreads :: [ThreadId] | |
, tpChanIn :: CommandQueue c i | |
, tpChanOut :: ReplyQueue c o | |
} | |
data Queue a = Bounded (TBQueue a) | |
| Unbounded (TQueue a) | |
newQueueIO :: QueueSize -> IO (Queue a) | |
newQueueIO l = case l of | |
Nothing -> Unbounded <$> newTQueueIO | |
Just l' -> Bounded <$> newTBQueueIO l' | |
{-# INLINE newQueueIO #-} | |
writeQueue :: Queue a -> a -> STM () | |
writeQueue q = case q of | |
Bounded q' -> writeTBQueue q' | |
Unbounded q' -> writeTQueue q' | |
{-# INLINE writeQueue #-} | |
readQueue :: Queue a -> STM a | |
readQueue q = case q of | |
Bounded q' -> readTBQueue q' | |
Unbounded q' -> readTQueue q' | |
{-# INLINE readQueue #-} | |
tryReadQueue :: Queue a -> STM (Maybe a) | |
tryReadQueue q = case q of | |
Bounded q' -> tryReadTBQueue q' | |
Unbounded q' -> tryReadTQueue q' | |
{-# INLINE tryReadQueue #-} | |
createPool :: (MonadIO m, MonadBaseControl IO m) => ThreadCount | |
-> QueueSize | |
-> QueueSize | |
-> Processor i m o | |
-> m (ThreadPool c i o) | |
createPool count commandQueueSize replyQueueSize handler = do | |
pending <- liftIO $ newTVarIO 0 | |
chanIn <- liftIO $ newQueueIO commandQueueSize | |
chanOut <- liftIO $ newQueueIO replyQueueSize | |
threads <- replicateM count $ fork $ worker handler chanIn chanOut pending | |
return ThreadPool { tpPending = pending | |
, tpThreads = threads | |
, tpChanIn = chanIn | |
, tpChanOut = chanOut | |
} | |
{-# SPECIALIZE createPool :: ThreadCount | |
-> QueueSize | |
-> QueueSize | |
-> Processor i IO o | |
-> IO (ThreadPool c i o) #-} | |
atomically' :: MonadIO m => STM a -> m a | |
atomically' = liftIO . atomically | |
{-# INLINE atomically' #-} | |
destroyPool :: MonadIO m => ThreadPool c i o -> m () | |
destroyPool pool = | |
atomically' $ replicateM_ (length $ tpThreads pool) $ writeQueue (tpChanIn pool) Stop | |
{-# SPECIALIZE destroyPool :: ThreadPool c i o -> IO () #-} | |
pushWork :: MonadIO m => ThreadPool c i o -> c -> i -> m () | |
pushWork pool !c !i = atomically' $ do | |
writeQueue (tpChanIn pool) (Execute c i) | |
modifyTVar' (tpPending pool) succ | |
{-# SPECIALIZE pushWork :: ThreadPool c i o -> c -> i -> IO () #-} | |
popResult :: MonadIO m => ThreadPool c i o -> m (Result c o) | |
popResult pool = atomically' $ readQueue (tpChanOut pool) | |
{-# SPECIALIZE popResult :: ThreadPool c i o -> IO (Result c o) #-} | |
popResult' :: MonadIO m => ThreadPool c i o -> m (Maybe (Result c o)) | |
popResult' pool = atomically' $ tryReadQueue (tpChanOut pool) | |
{-# SPECIALIZE popResult' :: ThreadPool c i o -> IO (Maybe (Result c o)) #-} | |
-- This is... no good (for now) | |
hasPendingWork :: MonadIO m => ThreadPool c i o -> m Bool | |
hasPendingWork pool = atomically' $ (/= 0) <$> readTVar (tpPending pool) | |
worker :: (MonadIO m, MonadBaseControl IO m) => Processor i m o | |
-> CommandQueue c i | |
-> ReplyQueue c o | |
-> TVar Int | |
-> m () | |
worker handler chanIn chanOut pending = loop | |
where | |
loop = do | |
debug "Awaiting work" | |
req <- atomically' $ readQueue chanIn | |
case req of | |
Execute c i -> do | |
debug "Executing command" | |
r <- try $! do | |
res <- handler i | |
return $! res | |
atomically' $ do | |
writeQueue chanOut (c, r) | |
modifyTVar' pending pred | |
loop | |
Stop -> do | |
debug "Shutdown" | |
return () | |
{-# SPECIALIZE worker :: Processor i IO o -> CommandQueue c i -> ReplyQueue c o -> TVar Int -> IO () #-} | |
withPool :: (MonadIO m, MonadBaseControl IO m) => ThreadCount | |
-> QueueSize | |
-> QueueSize | |
-> Processor i m o | |
-> (ThreadPool c i o -> m a) | |
-> m a | |
withPool count commandQueueSize replyQueueSize handler = | |
bracket | |
(createPool count commandQueueSize replyQueueSize handler) | |
destroyPool | |
{-# SPECIALIZE withPool :: ThreadCount | |
-> QueueSize | |
-> QueueSize | |
-> Processor i IO o | |
-> (ThreadPool c i o -> IO a) | |
-> IO a #-} | |
debug :: MonadIO m => String -> m () | |
#ifndef DEBUG | |
debug _ = return () | |
#else | |
debug s = liftIO $ do | |
tid <- myThreadId | |
hPutStrLn stderr $ "[" ++ show tid ++ "] " ++ s | |
#endif | |
{-# INLINE debug #-} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment