Skip to content

Instantly share code, notes, and snippets.

@raymondtay
Last active November 4, 2018 13:06
Show Gist options
  • Save raymondtay/e7c58b707ac002c3ebb281e7ee577d32 to your computer and use it in GitHub Desktop.
Save raymondtay/e7c58b707ac002c3ebb281e7ee577d32 to your computer and use it in GitHub Desktop.
Rate Limiting Producer using Par Monad Direct Scheduler
module RateLimitingStream (
Stream(..),
foldS,
streamFromList
) where
import System.Environment
import Control.Monad.IO.Class
import Control.Monad.Par
import Control.DeepSeq
import GHC.Generics (Generic)
data IList a = Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IList a) deriving (Generic)
type Stream a = IVar (IList a)
-- NFData defines functions which allows me/us to define how to evaluate
-- the data values i/we have so that it reduces to "normal form".
--
instance NFData a => NFData (IList a) where
rnf Nil = ()
rnf (Cons a b) = rnf a `seq` rnf b
rnf (Fork p a) = seq p (rnf a)
streamFromList :: NFData a => Int -> [a] -> Par (Stream a)
streamFromList n xs = do
v <- new
fork $ consume n n xs v
return v
where
consume :: NFData a => Int -> Int -> [a] -> IVar (IList a) -> Par ()
consume _ _ [] var = put var Nil
consume 0 n (x:xs) var = do
tail <- new
let parFn = consume n n xs tail -- par function created when the counter reaches 0 => new chunk is needed.
put var (Fork parFn (Cons x tail))
consume counter n (x:xs) var = do
tail <- new
put var (Cons x tail) -- builds the list as "counter /= 0"
consume (counter - 1) n xs tail
-- folding operation on a Stream that is parallelizable, what will happen is
-- that the folding proceeds as usual till the point "foldS" meets a "Fork"
-- which will fork a parallel computation.
foldS :: (a -> b -> a) -> a -> Stream b -> Par a
foldS fn !acc inS = do
ilist <- get inS
case ilist of
Nil -> return acc
Fork op (Cons h t) -> do
fork op
foldS fn (fn acc h) t
Cons h t -> foldS fn (fn acc h) t
mapS :: NFData b => (a -> b) -> Stream a -> Par (Stream b)
mapS fn inS = do
outS <- new
fork $ consume inS outS
return outS
where
consume inS outS = do
ilist <- get inS
case ilist of
Nil -> put outS Nil
Fork op (Cons h t) -> fork op >> do
newtail <- new
put outS (Cons (fn h) newtail)
consume t newtail
Cons h t -> do
newtail <- new
put outS (Cons (fn h) newtail)
consume t newtail
-- Filtering applies a predicate to the Stream and builds a Stream
-- containing all those elements that the predicate likes while discarding
-- those elements which failed the predicates.
--
filterS :: NFData a => (a -> Bool) -> Stream a -> Par (Stream a)
filterS f inS = do
outS <- new
fork $ consume inS outS
return outS
where
consume inS outS = do
ilist <- get inS
case ilist of
Nil -> put outS Nil
Fork op (Cons h t) ->
fork op >>
if f h then
do
tail <- new
put outS (Cons h tail)
consume t tail
else consume t outS
Cons h t
| f h -> do
tl <- new
put outS (Cons h tl)
consume t tl
| otherwise -> do
consume t outS
allLTOne :: Par (Stream Int)
allLTOne =
let xs = streamFromList 10 [1..1000]
in filterS (<1) (runPar xs)
allBetween101and201 :: Par (Stream Int)
allBetween101and201 =
let xs = streamFromList 10 [1..1000]
in filterS (\e -> (e>101) && (e<201)) (runPar xs)
test :: Int -> Int -> Par Int
test chunkSize eleS =
let es = [1..eleS]
xs = streamFromList chunkSize es
ys = mapS (+1) (runPar xs)
in foldS (+) 0 (runPar ys)
main :: IO ()
main = do
[chunkSize, elementsToGen] <- fmap (fmap read) getArgs
sum <- runParIO (test chunkSize elementsToGen)
putStrLn $ "The sum is : " ++ show sum
result <- runParIO allLTOne
case (runPar $ get result) of
Nil -> putStrLn "No elements LT 1 found."
_ -> putStrLn $ "Elements LT 1 found. "
result2 <- runParIO allBetween101and201
case (runPar $ get result2) of
Nil -> putStrLn "No elements > 101 and < 201 found."
_ -> putStrLn $ "Elements > 101 and < 201 found. "
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment