Last active
August 29, 2015 14:02
-
-
Save mwotton/d6ce537eec3115b2d6cf to your computer and use it in GitHub Desktop.
attempt at a rate balancer in pipes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ScopedTypeVariables #-} | |
module Net.LambdaSpider.RateBalancer where | |
import Data.IORef (atomicModifyIORef', newIORef) | |
import Data.IORef (IORef) | |
import Data.Time.Clock | |
import Pipes | |
import Pipes.Concurrent | |
data Decision = Kill | Continue | Fork | |
type Strategy c a = (c, a -> NominalDiffTime -> c -> (c,Decision)) | |
rateBalance (initial, updateState) seed pipe = do | |
(outputTotal, inputTotal) <- spawn Unbounded | |
(outDomains, inDomains) <- spawn Single | |
(state :: IORef c) <- newIORef initial | |
forkIO $ do | |
runEffect (seed >-> toOutput outDomains) | |
performGC | |
forkIO $ do | |
runEffect $ terminator state inDomains outputTotal | |
>-> toOutput outputTotal | |
performGC | |
return $ fromInput inputTotal | |
where terminator state inDomains outputTotal = do | |
before <- lift getCurrentTime | |
x <- lift $ next (fromInput inDomains >-> pipe) | |
after <- lift getCurrentTime | |
let diff = after `diffUTCTime` before | |
case x of | |
Left () -> return () | |
Right (a, _) -> do | |
yield a | |
(newstate,decision) <- liftIO $ atomicModifyIORef' state | |
(\st -> let (newst,decision) = updateState a diff st | |
in (newst, (newst,decision))) | |
case decision of | |
Continue -> terminator state inDomains outputTotal | |
Kill -> return () | |
Fork -> do | |
liftIO $ forkIO $ do | |
runEffect (terminator state inDomains outputTotal >-> toOutput outputTotal) | |
performGC | |
terminator state inDomains outputTotal | |
-- this is a terrible strategy. | |
steadyStrat :: Strategy () a | |
steadyStrat = ((), | |
\res difftime () -> ((),Continue)) | |
minMaxStrat min max = ((min, max,1), | |
\res difftime st -> decision st) | |
where decision (min,max,threads) | |
| threads > max = ((min,max,threads-1), Kill) | |
| threads < min = ((min,max,threads+1), Fork) | |
| otherwise = ((min,max,threads), Continue) | |
-- minMaxStrat min max = (min,max, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Net.LambdaSpider.RateBalancerSpec where | |
import Net.LambdaSpider.RateBalancer | |
import Pipes | |
import qualified Pipes.Prelude as PP | |
import Test.Hspec | |
spec = describe "simple case" $ do | |
it "gets everything back with a simple strat" $ do | |
prod <- rateBalance steadyStrat (mapM_ yield [1..10]) | |
(await >>= \x -> yield (show x)) | |
PP.toListM prod `shouldReturn` (map show [1..10]) | |
it "gets everything back with a minmax strat" $ do | |
prod <- rateBalance (minMaxStrat 30 50) (mapM_ yield [1..10000]) | |
(await >>= \x -> yield (show x)) | |
PP.toListM prod `shouldReturn` (map show [1..10000]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment