Skip to content

Instantly share code, notes, and snippets.

@Elvecent
Last active September 1, 2019 23:52
Show Gist options
  • Save Elvecent/08361d61b6c174eedc9bab69b24ecebd to your computer and use it in GitHub Desktop.
Save Elvecent/08361d61b6c174eedc9bab69b24ecebd to your computer and use it in GitHub Desktop.
Running jobs asynchronously but yielding results in order
module Utils.Concurrent (mkPipeline, launchNukes) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async, cancel, wait)
import Control.Concurrent.STM (TBQueue, atomically, newTBQueueIO,
readTBQueue, writeTBQueue)
import Control.Monad (forever, void)
import Control.Monad.IO.Class (MonadIO, liftIO)
import GHC.Natural (Natural)
runJobs :: TBQueue a -> TBQueue (Async b) -> (a -> IO b) -> IO ()
runJobs inputs runningJobs work = forever $ do
input <- atomically $ readTBQueue inputs
runningJob <- async $ work input
atomically $ writeTBQueue runningJobs runningJob
orderJobs :: TBQueue (Async a) -> TBQueue a -> IO ()
orderJobs jobs resQ = forever $ do
runningJob <- atomically $ readTBQueue jobs
res <- wait runningJob
atomically $ writeTBQueue resQ res
mkPipeline :: MonadIO m =>
Natural -- ^ Bound for all queues
-> (a -> IO b) -- ^ Job to produce outputs from inputs
-> (b -> IO ()) -- ^ Job to make use of outputs
-> m (a -> m (), m ()) -- ^ Procedure for putting values onto the pipeline and cancellation
mkPipeline n job yield = do
ins <- liftIO $ newTBQueueIO n
jobs <- liftIO $ newTBQueueIO n
outs <- liftIO $ newTBQueueIO n
plumbing <- liftIO $ traverse async $
[ runJobs ins jobs job
, orderJobs jobs outs
, forever $ do
out <- atomically $ readTBQueue outs
yield out
]
return $
( liftIO . atomically . writeTBQueue ins
, liftIO . void $ traverse cancel plumbing
)
launchNukes :: MonadIO m => m ()
launchNukes = do
(put, stop) <- mkPipeline 3 job putStrLn
let loop = do
n <- liftIO $ read <$> getLine
case n :: Int of
0 -> stop
_ -> do
put n
loop
in loop
where
job n = do
threadDelay $ n * 1000000
putStrLn $ "Squaring number " <> show n
return $ "Number squared: " <> (show $ n * n)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment