Skip to content

Instantly share code, notes, and snippets.

@gelisam
Last active April 15, 2019 04:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gelisam/4d4db5c61bb77df72fcf84141da07bb5 to your computer and use it in GitHub Desktop.
Save gelisam/4d4db5c61bb77df72fcf84141da07bb5 to your computer and use it in GitHub Desktop.
benchmarking various implementations of parallel traverse
-- in response to https://twitter.com/snoyberg/status/1116710317554315265
--
-- Green threads are very cheap, so I want to investigate under which
-- circumstances it is worth making the logic more complex in order to minimize
-- their number. So I wrote multiple implementations, ran them with a different
-- number of inputs, and with different task sizes.
--
-- On my four-capabilities machine, my observations are:
--
-- * With very small (10 μs) tasks, the single-threaded 'traverse' wins,
-- regardless of the number of tasks. That's because all the
-- implementations perform a linear amount of overhead work in a
-- single-thread, so if the size of the jobs is small enough, the overhead
-- of a job can be larger than the cost of the job itself. The rest of
-- these observations only consider the larger tasks (100 μs and 1000 μs).
-- * With a moderate amount of tasks (10 or 100), all the implementations are
-- about as fast, regardless of the size of the tasks. This means you
-- probably don't need to use 'getNumCapabilities' to make sure you're not
-- allocating more threads than the number of available cores; as long as
-- the number of threads isn't too crazy, their exact number doesn't seem
-- to matter.
-- * With the largest number of tasks (1000), the large number of green
-- threads does seem to have a small impact, as the QSem implementation is
-- about 5% slower than the other implementations. I tested with an even
-- larger number of tasks, and the 5% figure seems constant.
--
-- You can see my results and analyze them for yourself here: http://gelisam.com/files/speedy-traverse/benchmark.html
-- Or you can run the benchmark yourself by compiling this program with "-O2
-- -threaded -with-rtsopts=-N -rtsopts" and running it with "--output
-- benchmark.html".
module Main where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Control.Monad.Loops
import Criterion.Main
import Data.Foldable
import Data.IORef
import Data.Traversable
import qualified Data.Vector as Vector
import qualified UnliftIO.Async as UnliftIO
-- the original from
-- https://twitter.com/HandyHaskell/status/1116391447085899776, which
-- supposedly allocates too many threads
speedyTraverseQSem
:: Traversable t
=> Maybe Int
-> (a -> IO b)
-> t a
-> IO (t b)
speedyTraverseQSem Nothing f jobs = mapConcurrently f jobs
speedyTraverseQSem (Just j) f jobs = do
sem <- newQSem j
mapConcurrently (bracket_ (waitQSem sem) (signalQSem sem) . f) jobs
-- a supposedly-better version which only forks j threads
speedyTraverseChan
:: Traversable t
=> Maybe Int
-> (a -> IO b)
-> t a
-> IO (t b)
speedyTraverseChan Nothing f jobs = mapConcurrently f jobs
speedyTraverseChan (Just j) f jobs = do
chan <- newChan
outputVars <- for jobs $ \job -> do
outputVar <- newEmptyMVar
writeChan chan (Just (job, outputVar))
pure outputVar
-- Chan doesn't support tryReadChan, so we simulate it by pushing Nothings
writeList2Chan chan (replicate j Nothing)
replicateConcurrently_ j $ do
whileJust_ (readChan chan) $ \(job, outputVar) -> do
output <- f job
putMVar outputVar output
mapM takeMVar outputVars
-- a version which only uses IORefs, to avoid the overhead of MVars (Chan is
-- implemented using MVar).
speedyTraverseIORef
:: Traversable t
=> Maybe Int
-> (a -> IO b)
-> t a
-> IO (t b)
speedyTraverseIORef Nothing f jobs = mapConcurrently f jobs
speedyTraverseIORef (Just j) f jobs = do
outputVars <- for jobs $ \_ -> newIORef undefined
-- 'evaluate' makes sure the queue is only created once on the main thread
queue <- evaluate $ Vector.fromList $ zip (toList jobs) (toList outputVars)
n <- newIORef (Vector.length queue - 1)
let worker = do
i <- atomicDecr n
unless (i < 0) $ do
let (job, outputVar) = queue Vector.! i
output <- f job
writeIORef outputVar output
worker
replicateConcurrently_ j worker
mapM readIORef outputVars
where
atomicDecr :: IORef Int -> IO Int
atomicDecr ref = atomicModifyIORef' ref (\x -> (x-1, x))
-- n tasks, each of which takes d microseconds
runTasks :: ((Int -> IO ()) -> [Int] -> IO [()])
-> Int -> Int -> IO ()
runTasks myTraverse n d = last <$> myTraverse threadDelay (replicate n d)
-- try different values for n and d. the numbers were chosen so that the
-- benchmark doesn't take too long; with those numbers, it currently takes
-- about 4 minutes to run it.
mkBGroup :: ((Int -> IO ()) -> [Int] -> IO [()])
-> [Benchmark]
mkBGroup myTraverse = flip fmap [10,100,1000] $ \n
-> bgroup (show n)
$ flip fmap [10,100,1000] $ \d
-> bench (show d)
$ whnfIO (runTasks myTraverse n d)
main :: IO ()
main = do
n <- getNumCapabilities
putStrLn $ "number of threads: " ++ show n
defaultMain
[ bgroup "SingleThreaded" $ mkBGroup traverse
, bgroup "QSem" $ mkBGroup (speedyTraverseQSem (Just n))
, bgroup "Chan" $ mkBGroup (speedyTraverseChan (Just n))
, bgroup "IORef" $ mkBGroup (speedyTraverseIORef (Just n))
, bgroup "UnliftIO" $ mkBGroup (UnliftIO.pooledMapConcurrentlyN n)
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment