Created
June 25, 2014 03:48
-
-
Save mwotton/558dd7a2237c2c736c41 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ScopedTypeVariables #-} | |
module Net.LambdaSpider.RateBalancer where | |
import Data.IORef (atomicModifyIORef', newIORef) | |
import Data.IORef (IORef) | |
import Data.Time.Clock | |
import Pipes | |
import Pipes.Concurrent | |
data Decision = Kill | Continue | Fork | |
type Strategy c a = (c, a -> NominalDiffTime -> c -> (c,Decision)) | |
rateBalance (initial, updateState) seed pipe = do | |
(outputTotal, inputTotal) <- spawn Unbounded | |
(outDomains, inDomains) <- spawn Single | |
(state :: IORef c) <- newIORef initial | |
forkIO $ do | |
runEffect (seed >-> toOutput outDomains) | |
performGC | |
forkIO $ do | |
runEffect $ terminator state inDomains outputTotal | |
>-> toOutput outputTotal | |
performGC | |
return $ fromInput inputTotal | |
where terminator state inDomains outputTotal = do | |
before <- lift getCurrentTime | |
x <- lift $ next (fromInput inDomains >-> pipe) | |
after <- lift getCurrentTime | |
let diff = after `diffUTCTime` before | |
case x of | |
Left () -> return () | |
Right (a, _) -> do | |
yield a | |
(newstate,decision) <- liftIO $ atomicModifyIORef' state | |
(\st -> let (newst,decision) = updateState a diff st | |
in (newst, (newst,decision))) | |
case decision of | |
Continue -> terminator state inDomains outputTotal | |
Kill -> return () | |
Fork -> do | |
liftIO $ forkIO $ runEffect (terminator state inDomains outputTotal >-> toOutput outputTotal) | |
terminator state inDomains outputTotal | |
-- this is a terrible strategy. | |
steadyStrat :: Strategy () a | |
steadyStrat = ((), | |
\res difftime () -> ((),Continue)) | |
minMaxStrat min max = ((min, max,1), | |
\res difftime st -> decision st) | |
where decision (min,max,threads) | |
| threads > max = ((min,max,threads-1), Kill) | |
| threads < min = ((min,max,threads+1), Fork) | |
| otherwise = ((min,max,threads), Continue) | |
-- minMaxStrat min max = (min,max, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Net.LambdaSpider.RateBalancerSpec where | |
import Net.LambdaSpider.RateBalancer | |
import Pipes | |
import qualified Pipes.Prelude as PP | |
import Test.Hspec | |
spec = describe "simple case" $ do | |
it "gets everything back with a simple strat" $ do | |
-- prod <- rateBalance steadyStrat (mapM_ yield [1..10]) | |
prod <- rateBalance steadyStrat (mapM_ yield [1..10]) | |
(await >>= \x -> yield (show x)) | |
PP.toListM prod `shouldReturn` (map show [1..10]) | |
it "gets everything back with a minmax strat" $ do | |
-- prod <- rateBalance steadyStrat (mapM_ yield [1..10]) | |
prod <- rateBalance (minMaxStrat 3 5) (mapM_ yield [1..10]) | |
(await >>= \x -> yield (show x)) | |
PP.toListM prod `shouldReturn` (map show [1..10]) | |
--runEffect $ prod >-> PP.print |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment