Skip to content

Instantly share code, notes, and snippets.

@4e6
Created October 17, 2018 09:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 4e6/6519088912ed4fda537f731de5538b8e to your computer and use it in GitHub Desktop.
Save 4e6/6519088912ed4fda537f731de5538b8e to your computer and use it in GitHub Desktop.
parallel map with timeout
$ cabal new-exec racer
Start
TimedOut [ThreadId 12,ThreadId 13,ThreadId 14,ThreadId 15,ThreadId 16,ThreadId 17]
Finish
-- ^ program executed 7/10 tasks and exited by timeout
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NumDecimals #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE PartialTypeSignatures #-}
module Main where
import qualified Control.Concurrent
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.Async.Pool as Async.Pool
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TQueue as STM.TQueue
import Control.Monad
class Job a where
type Out a
run :: a -> Out a
newtype SleepJob = SleepSeconds Double
instance Job SleepJob where
type Out SleepJob = IO Control.Concurrent.ThreadId
run (SleepSeconds i) = do
Control.Concurrent.threadDelay (truncate $ i * 1e6)
Control.Concurrent.myThreadId
data QueueSleepJob a = QueueSleepSeconds Double (STM.TQueue.TQueue a)
instance Job (QueueSleepJob Control.Concurrent.ThreadId) where
type Out (QueueSleepJob Control.Concurrent.ThreadId) = IO ()
run (QueueSleepSeconds i q) = do
Control.Concurrent.threadDelay (truncate $ i * 1e6)
t <- Control.Concurrent.myThreadId
STM.atomically $ STM.TQueue.writeTQueue q t
data TimedResult a
= TimedOut a
| Finished a
deriving (Eq, Ord, Show)
fromEither :: (a -> c) -> (b -> c) -> Either a b -> TimedResult c
fromEither l r = either (TimedOut . l) (Finished . r)
mapConcurrentlyTimed :: Traversable t => Int -> (a -> IO b) -> t a -> IO (TimedResult [b])
mapConcurrentlyTimed delayMicros f xs = do
q <- STM.TQueue.newTQueueIO
let
exec = f >=> STM.atomically . STM.TQueue.writeTQueue q
action = Async.mapConcurrently exec xs
delay = Control.Concurrent.threadDelay delayMicros
raceResult <- Async.race delay action
results <- STM.atomically $ STM.TQueue.flushTQueue q
pure $ fromEither (const results) (const results) raceResult
mapTasksTimed :: Traversable t => Int -> Async.Pool.TaskGroup -> (a -> IO b) -> t a -> IO (TimedResult [b])
mapTasksTimed delayMicros g f xs = do
q <- STM.TQueue.newTQueueIO
let
exec = f >=> STM.atomically . STM.TQueue.writeTQueue q
action = Async.Pool.mapTasks g (fmap exec xs)
delay = Control.Concurrent.threadDelay delayMicros
raceResult <- Async.Pool.withTaskGroup 2 $ \g' ->
Async.Pool.race g' delay action
results <- STM.atomically $ STM.TQueue.flushTQueue q
pure $ fromEither (const results) (const results) raceResult
main :: IO ()
main = do
putStrLn "Start"
let
jobs = replicate 10 (SleepSeconds 0.1)
result <- Async.Pool.withTaskGroup 3 $ \g ->
mapTasksTimed 200000 g run jobs
print result
putStrLn "Finish"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment