Last active
December 17, 2018 05:18
-
-
Save lylek/58d9307431c9dbee6505bd588b8dfbc9 to your computer and use it in GitHub Desktop.
Rate-limiting version of Stream.hs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE BangPatterns, CPP #-} | |
{-# OPTIONS_GHC -fno-warn-name-shadowing -fwarn-unused-imports #-} | |
-- -Wall | |
-- This is a rate-limiting version of https://github.com/simonmar/parconc-examples/blob/master/Stream.hs, | |
-- as per the exercise on page 69 of Parallel and Concurrent Programming in Haskell. | |
-- A module for stream processing built on top of Control.Monad.Par | |
-- (In the future may want to look into the stream interface used by | |
-- the stream fusion framework.) | |
module StreamFork | |
( | |
Stream, streamFromList, streamMap, streamFold, streamFilter | |
) where | |
import Control.Monad.Par.Scheds.Trace as P | |
import Control.DeepSeq | |
-------------------------------------------------------------------------------- | |
-- Types | |
-- <<IList | |
data IList a | |
= Nil | |
| Cons a (IVar (IList a)) | |
| Fork (Par ()) (IList a) | |
type Stream a = IVar (IList a) | |
-- >> | |
instance NFData a => NFData (IList a) where | |
-- rnf Nil = r0 | |
rnf Nil = () | |
rnf (Cons a b) = rnf a `seq` rnf b | |
rnf (Fork _ b) = rnf b | |
-- ----------------------------------------------------------------------------- | |
-- Stream operators | |
-- Is the producer supposed to generate ILists only of length chunkLength + 1, ending in | |
-- Nil? So that the consumer must know to append the result of the forked computation on | |
-- reaching Nil? That seems awkward for the consumer: | |
-- | |
-- streamFromList 4 2 [0..8] | |
-- f0 = Cons 0 $ Cons 1 $ Fork f1 $ Cons 2 $ Cons 3 Nil | |
-- f1 = Cons 4 $ Cons 5 $ Fork f2 $ Cons 6 $ Cons 7 Nil | |
-- f2 = Cons 8 $ Nil | |
-- Or can the producer somehow tie the knot, so that the last IVar in the chunk is | |
-- actually the first IVar from the forked computation? | |
-- Well, note that the forked computation returns unit, not an IList or Stream. So | |
-- there's nothing for the consumer to get from the forked computation. Thus it must be | |
-- that the forked computation just fills in more of the same IList. | |
-- Note that chunkSize must be > 0 and 0 <= forkDistance < chunkSize | |
-- <<streamFromList | |
streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a) | |
streamFromList chunkSize forkDistance xs = do | |
var <- new | |
nextChunk <- new | |
fork $ loop 0 xs var nextChunk | |
return var | |
where | |
loop :: NFData a => Int -> [a] -> Stream a -> Stream a -> Par () | |
loop _ [] var _ = put var Nil | |
loop i (x:xs) var nextChunk = | |
if i == forkDistance | |
then do | |
newtl <- new | |
nextNextChunk <- new | |
let calcNextChunk = loop 0 (drop (chunkSize - forkDistance - 1) xs) nextChunk nextNextChunk | |
put var (Fork calcNextChunk (Cons x newtl)) | |
loop (i+1) xs newtl nextChunk | |
else | |
if i == chunkSize - 1 | |
then put var (Cons x nextChunk) | |
else do | |
newtl <- new | |
put var (Cons x newtl) | |
loop (i+1) xs newtl nextChunk | |
-- >> | |
-- <<streamMap | |
streamMap :: (NFData a, NFData b) => Int -> Int -> (a -> b) -> Stream a -> Par (Stream b) | |
streamMap chunkSize forkDistance fn instrm = do | |
outstrm <- new | |
nextChunk <- new | |
fork $ loop 0 instrm outstrm nextChunk | |
return outstrm | |
where | |
loop i instrm outstrm nextChunk = do | |
ilst <- get instrm | |
processElement ilst | |
where | |
processElement ilst = | |
case ilst of | |
Nil -> put outstrm Nil | |
Cons h t -> | |
if i == forkDistance | |
then do | |
newtl <- new | |
nextNextChunk <- new | |
let dropN :: NFData a => Int -> Stream a -> Par (Stream a) | |
dropN i str = | |
if i == 0 then return str | |
else do | |
el <- get str | |
case el of | |
Nil -> do | |
nv <- new | |
put nv Nil | |
return nv | |
Cons _ tl -> dropN (i-1) tl | |
(Fork _ tl) -> do | |
nt <- new | |
put nt tl | |
dropN i nt | |
let calcNextChunk = do | |
instr <- dropN (chunkSize - forkDistance - 1) t | |
loop 0 instr nextChunk nextNextChunk | |
put outstrm (Fork calcNextChunk (Cons (fn h) newtl)) | |
loop (i+1) t newtl nextChunk | |
else if i == chunkSize - 1 | |
then put outstrm (Cons (fn h) nextChunk) | |
else do | |
newtl <- new | |
put outstrm (Cons (fn h) newtl) | |
loop (i+1) t newtl nextChunk | |
Fork p tl -> do | |
fork p | |
processElement tl | |
-- >> | |
-- | Reduce a stream to a single value. This function will not return | |
-- until it reaches the end-of-stream. | |
-- <<streamFold | |
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a | |
streamFold fn !acc instrm = do | |
ilst <- get instrm | |
case ilst of | |
Nil -> return acc | |
Cons h t -> streamFold fn (fn acc h) t | |
Fork p Nil -> return acc | |
Fork p (Cons h t) -> do | |
fork p | |
streamFold fn (fn acc h) t | |
-- >> | |
streamFilter :: NFData a => (a -> Bool) -> Stream a -> Par (Stream a) | |
streamFilter p instr = do | |
outstr <- new | |
fork $ loop instr outstr | |
return outstr | |
where | |
loop instr outstr = do | |
v <- get instr | |
case v of | |
Nil -> put outstr Nil | |
Cons x instr' | |
| p x -> do | |
tail <- new | |
put_ outstr (Cons x tail) | |
loop instr' tail | |
| otherwise -> do | |
loop instr' outstr |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment