Skip to content

Instantly share code, notes, and snippets.

@mwotton
Last active August 29, 2015 14:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwotton/d6ce537eec3115b2d6cf to your computer and use it in GitHub Desktop.
Save mwotton/d6ce537eec3115b2d6cf to your computer and use it in GitHub Desktop.
attempt at a rate balancer in pipes
{-# LANGUAGE ScopedTypeVariables #-}
module Net.LambdaSpider.RateBalancer where
import Data.IORef (atomicModifyIORef', newIORef)
import Data.IORef (IORef)
import Data.Time.Clock
import Pipes
import Pipes.Concurrent
data Decision = Kill | Continue | Fork
type Strategy c a = (c, a -> NominalDiffTime -> c -> (c,Decision))
rateBalance (initial, updateState) seed pipe = do
(outputTotal, inputTotal) <- spawn Unbounded
(outDomains, inDomains) <- spawn Single
(state :: IORef c) <- newIORef initial
forkIO $ do
runEffect (seed >-> toOutput outDomains)
performGC
forkIO $ do
runEffect $ terminator state inDomains outputTotal
>-> toOutput outputTotal
performGC
return $ fromInput inputTotal
where terminator state inDomains outputTotal = do
before <- lift getCurrentTime
x <- lift $ next (fromInput inDomains >-> pipe)
after <- lift getCurrentTime
let diff = after `diffUTCTime` before
case x of
Left () -> return ()
Right (a, _) -> do
yield a
(newstate,decision) <- liftIO $ atomicModifyIORef' state
(\st -> let (newst,decision) = updateState a diff st
in (newst, (newst,decision)))
case decision of
Continue -> terminator state inDomains outputTotal
Kill -> return ()
Fork -> do
liftIO $ forkIO $ do
runEffect (terminator state inDomains outputTotal >-> toOutput outputTotal)
performGC
terminator state inDomains outputTotal
-- this is a terrible strategy.
steadyStrat :: Strategy () a
steadyStrat = ((),
\res difftime () -> ((),Continue))
minMaxStrat min max = ((min, max,1),
\res difftime st -> decision st)
where decision (min,max,threads)
| threads > max = ((min,max,threads-1), Kill)
| threads < min = ((min,max,threads+1), Fork)
| otherwise = ((min,max,threads), Continue)
-- minMaxStrat min max = (min,max,
module Net.LambdaSpider.RateBalancerSpec where
import Net.LambdaSpider.RateBalancer
import Pipes
import qualified Pipes.Prelude as PP
import Test.Hspec
spec = describe "simple case" $ do
it "gets everything back with a simple strat" $ do
prod <- rateBalance steadyStrat (mapM_ yield [1..10])
(await >>= \x -> yield (show x))
PP.toListM prod `shouldReturn` (map show [1..10])
it "gets everything back with a minmax strat" $ do
prod <- rateBalance (minMaxStrat 30 50) (mapM_ yield [1..10000])
(await >>= \x -> yield (show x))
PP.toListM prod `shouldReturn` (map show [1..10000])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment