Create a gist now

Instantly share code, notes, and snippets.

@osa1 /Main.hs
Last active Nov 6, 2017

What would you like to do?
parallel scheduler
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
module Main where
--------------------------------------------------------------------------------
import Control.Concurrent.Async.Lifted.Safe
import Control.Concurrent.MVar.Lifted
import Control.Concurrent.Timeout
import Control.Exception.Lifted
import Control.Monad
import Control.Monad.Logger.CallStack hiding (defaultLogStr)
import Control.Monad.Trans.Control
import Data.Function
import Data.IORef
import Data.Monoid
import Data.Ord
import qualified Data.Set as S
import qualified Data.Text as T
import Hedgehog
import qualified Hedgehog.Gen as Gen
import qualified Hedgehog.Range as Range
import System.Log.FastLogger
--------------------------------------------------------------------------------
newtype Unique = Unique Int
deriving (Eq, Ord)
--------------------------------------------------------------------------------
data Resource = Resource
{ _resourceName :: T.Text
, _resourceId :: Unique
, _resourceLock :: MVar ()
}
instance Eq Resource where
(==) = (==) `on` _resourceId
instance Ord Resource where
compare = comparing _resourceId
instance Show Resource where
show = T.unpack . _resourceName
--------------------------------------------------------------------------------
withResources :: (MonadLogger m, MonadBaseControl IO m) => S.Set Resource -> m () -> m ()
withResources locks a = acquire_locks (S.toList locks)
where
acquire_locks ls = case ls of
[] ->
a
l : ls' -> do
logDebug ("taking lock " <> (_resourceName l))
withMVar (_resourceLock l) $ \() ->
acquire_locks ls'
--------------------------------------------------------------------------------
newtype Task = Task { runTask :: forall m . (MonadLogger m, MonadBaseControl IO m) => m () }
mkFastTask :: Int -> S.Set Resource -> Task
mkFastTask i res =
Task $ withResources res $ do
logDebug ("Performing " <> T.pack (show i))
threadDelay (500 :: Milliseconds)
logDebug ("Fast task done (" <> T.pack (show i) <> ")")
mkSlowTask :: Int -> S.Set Resource -> Task
mkSlowTask i res =
Task $ withResources res $ do
logDebug ("Performing " <> T.pack (show i))
threadDelay (3 :: Seconds)
logDebug ("Slow task done (" <> T.pack (show i) <> ")")
mkCrashingTask :: Int -> S.Set Resource -> Task
mkCrashingTask i res =
Task $ withResources res $ do
logDebug ("Performing " <> T.pack (show i))
error "task failed"
--------------------------------------------------------------------------------
newtype UniqueGen = UniqueGen (IORef Int)
mkUniqGen :: IO UniqueGen
mkUniqGen = UniqueGen <$> newIORef 0
mkUniq :: UniqueGen -> IO Unique
mkUniq (UniqueGen ref) = atomicModifyIORef' ref (\i -> (i + 1, Unique i))
genResources :: Int -> IO [Resource]
genResources n =
forM [ 0 .. n ] $ \i -> do
lock <- newMVar () -- initially full
return $ Resource
{ _resourceName = T.pack ("resource" ++ show i)
, _resourceId = Unique i
, _resourceLock = lock
}
genTaskRes :: MonadGen m => [Resource] -> Int -> m [S.Set Resource]
genTaskRes res n = replicateM n (genTaskRes' res)
genTaskRes' :: MonadGen m => [Resource] -> m (S.Set Resource)
genTaskRes' res = Gen.set (Range.linear 1 (length res)) (Gen.element res)
--------------------------------------------------------------------------------
main :: IO ()
main = do
res0 <- genResources 8
deps <- Gen.sample (genTaskRes res0 10)
withFastLogger (LogStdout defaultBufSize) $ \fast_logger -> do
flip runLoggingT (mkDefaultLogger fast_logger LevelDebug) $ do
thrs <- forM (zip3 [ 0 .. ] deps (cycle [mkFastTask, mkSlowTask, mkCrashingTask])) $ \(i, res, task) ->
async (runTask (task i res) `catch` (\(_ :: SomeException) -> logDebug "Task failed"))
forM_ thrs wait
mkDefaultLogger :: FastLogger -> LogLevel -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
mkDefaultLogger logger logLvl loc src msgLvl str
| logLvl <= msgLvl = logger (defaultLogStr loc src msgLvl str)
| otherwise = return ()
defaultLogStr :: Loc -> LogSource -> LogLevel -> LogStr -> LogStr
defaultLogStr _ _ _ s = s <> "\n"
$ stack exec scheduler
taking lock resource5
Performing 0
taking lock resource0
Performing 1
taking lock resource2
taking lock resource6
taking lock resource7
Performing 2
Task failed
taking lock resource6
Performing 3
taking lock resource8
Performing 4
taking lock resource1
taking lock resource2
Performing 5
Task failed
taking lock resource2
taking lock resource3
taking lock resource8
taking lock resource0
taking lock resource3
taking lock resource4
Performing 9
Fast task done (3)
Fast task done (9)
Fast task done (0)
Slow task done (1)
taking lock resource4
taking lock resource8
Slow task done (4)
Performing 6
Fast task done (6)
taking lock resource7
Performing 8
Task failed
Performing 7
Slow task done (7)
name: scheduler
version: 0.1.0.0
license: BSD3
author: Ömer Sinan Ağacan
maintainer: omeragacan@gmail.com
copyright: 2017 Ömer Sinan Ağacan
category: Web
build-type: Simple
cabal-version: >=1.10
executable scheduler
main-is: Main.hs
build-depends:
base >= 4.7 && < 5,
bytestring,
containers,
fast-logger,
hedgehog,
lifted-async,
lifted-base,
monad-control,
monad-logger,
text,
typed-duration
default-language: Haskell2010
ghc-options: -O2 -Wall -threaded -rtsopts
resolver: lts-9.12
packages:
- .
extra-deps:
- typed-duration-0.1.1.0
- hedgehog-0.5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment