Skip to content

Instantly share code, notes, and snippets.

@khibino
Created September 20, 2021 16:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save khibino/d21370b441fe06b29eac0758b9741160 to your computer and use it in GitHub Desktop.
Save khibino/d21370b441fe06b29eac0758b9741160 to your computer and use it in GitHub Desktop.
{--# OPTIONS_GHC -Wno-name-shadowing #-}
import Control.Concurrent (forkIO)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Monad (void, replicateM, replicateM_)
import System.IO (BufferMode (LineBuffering), hSetBuffering, stdout)
-----
newtype Queue k a = Queue (Chan a)
newQueue :: IO (Queue k a)
newQueue = Queue <$> newChan
readQueue :: Queue k a -> IO a
readQueue (Queue c) = readChan c
writeQueue :: Queue k a -> a -> IO ()
writeQueue (Queue c) = writeChan c
closeQueue :: Queue k (Maybe a) -> IO ()
closeQueue (Queue c) = writeChan c Nothing
-----
type JobId = Int
data Dep
data Cap
data Runnable
data Resolve
data Result
type DepQ a = Queue Dep (JobId, a)
type CapQ = Queue Cap ()
type RunnableQ a = Queue Runnable (Maybe (Job a))
type ResolveQ = Queue Resolve ()
type ResultQ a = Queue Result a
data WaitDepends a =
WaitDepends
{ depQueue :: DepQ a
, depCount :: Int
}
data Job a =
Job
{ jobId :: JobId
, action :: IO a
, depends :: Maybe (WaitDepends a)
, notifyList :: [DepQ a]
}
invokeJobs :: CapQ -> RunnableQ a -> ResultQ a -> IO ()
invokeJobs capQ runQ resQ = loop
where
loop = do
readQueue capQ -- wait to get one capability
maybe (pure () {- end with sentinel -}) ((*> loop) . invoke) =<< readQueue runQ
invoke rj = forkIO $ do
x <- action rj
mapM_ (\dq -> writeQueue dq (jobId rj, x)) $ notifyList rj
writeQueue resQ x -- push job result
writeQueue capQ () -- release capability
waitDepends :: RunnableQ a -> ResolveQ -> Job a -> IO ()
waitDepends runQ rsvQ job@(Job { depends = mayDep }) = do
let waitDep dep = replicateM_ (depCount dep) (readQueue $ depQueue dep)
maybe (pure ()) waitDep mayDep -- wait dependencies
writeQueue runQ (Just job) -- job is runnable
writeQueue rsvQ () -- notify resolved
waitResolved :: Int -> ResolveQ -> RunnableQ a -> IO ()
waitResolved count rsvQ runQ = do
replicateM_ count (readQueue rsvQ) -- wait resolve deps
closeQueue runQ -- close runnable list. stopping invokeJobs
waitResult :: Int -> ResultQ a -> IO [a]
waitResult count resQ = replicateM count (readQueue resQ)
-----
data Log
type LogQ = Queue Log (Maybe String)
getLogger :: IO (IO (), String -> IO (), IO ())
getLogger = do
logQ <- newQueue :: IO LogQ
let loop = maybe (pure ()) ((*> loop) . putStrLn) =<< readQueue logQ
return (loop, writeQueue logQ . Just, closeQueue logQ)
-----
data Runner a =
Runner
{ runnable :: RunnableQ a
, resolve :: ResolveQ
, result :: ResultQ a
, jobCount :: !Int
, loggerThread :: IO ()
, putLogLn :: String -> IO ()
, closeLogger :: IO ()
}
prunner :: Int -- ^ capability count
-> IO (Runner a)
prunner cap = do
capQ <- newQueue
runQ <- newQueue
rsvQ <- newQueue
resQ <- newQueue
(logTh, putLn, close) <- getLogger
void $ forkIO $ invokeJobs capQ runQ resQ
void $ replicateM cap (writeQueue capQ ())
return $ Runner runQ rsvQ resQ 0 logTh putLn close
addJob :: Runner a -> [Job a] -> IO (Runner a)
addJob r@(Runner { runnable = runQ, resolve = rsvQ, jobCount = n }) jobs = do
mapM_ (forkIO . waitDepends runQ rsvQ) jobs
return r { jobCount = n + length jobs }
waitJobResult :: Runner a -> IO [a]
waitJobResult (Runner { runnable = runQ, resolve = rsvQ, result = resQ, jobCount = n }) = do
waitResolved n rsvQ runQ
waitResult n resQ
runParallel :: Int -> [Job a] -> IO [a]
runParallel cap jobs =
prunner cap >>= (`addJob` jobs) >>= waitJobResult
-----
delayJob :: (String -> IO ()) -> Int -> IO ()
delayJob logLn i = do
logLn $ "job " ++ show i ++ " started"
threadDelay $ (i `rem` 2 + 1) * 1000 * 1000
logLn $ "job " ++ show i ++ " finished"
job_ :: (String -> IO ()) -> JobId -> Maybe (WaitDepends ()) -> [DepQ ()] -> Job ()
job_ logLn i = Job i (delayJob logLn i)
exampleJobs :: (String -> IO ()) -> IO ([Job ()])
exampleJobs logLn = do
depQ1 <- newQueue
depQ2 <- newQueue
let job1 = job_ logLn 1 (Just (WaitDepends depQ1 10)) []
job2 = job_ logLn 2 (Just (WaitDepends depQ2 10)) []
jobs = map (\i -> job_ logLn i Nothing [depQ1, depQ2]) $ take 10 [3..]
-- job3 = job_ logLn 3 Nothing [depQ1, depQ2]
-- job4 = job_ logLn 4 Nothing [depQ1, depQ2]
return ([job1, job2] ++ jobs)
runExample :: IO ()
runExample = do
hSetBuffering stdout LineBuffering
runner <- prunner 4
jobs <- exampleJobs $ putLogLn runner
void $ forkIO $ do
void $ addJob runner jobs >>= waitJobResult
closeLogger runner
loggerThread runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment