Skip to content

Instantly share code, notes, and snippets.

@lylek
Last active December 17, 2018 05:18
Show Gist options
  • Save lylek/58d9307431c9dbee6505bd588b8dfbc9 to your computer and use it in GitHub Desktop.
Save lylek/58d9307431c9dbee6505bd588b8dfbc9 to your computer and use it in GitHub Desktop.
Rate-limiting version of Stream.hs
{-# LANGUAGE BangPatterns, CPP #-}
{-# OPTIONS_GHC -fno-warn-name-shadowing -fwarn-unused-imports #-}
-- -Wall
-- This is a rate-limiting version of https://github.com/simonmar/parconc-examples/blob/master/Stream.hs,
-- as per the exercise on page 69 of Parallel and Concurrent Programming in Haskell.
-- A module for stream processing built on top of Control.Monad.Par
-- (In the future may want to look into the stream interface used by
-- the stream fusion framework.)
module StreamFork
(
Stream, streamFromList, streamMap, streamFold, streamFilter
) where
import Control.Monad.Par.Scheds.Trace as P
import Control.DeepSeq
--------------------------------------------------------------------------------
-- Types
-- <<IList
data IList a
= Nil
| Cons a (IVar (IList a))
| Fork (Par ()) (IList a)
type Stream a = IVar (IList a)
-- >>
instance NFData a => NFData (IList a) where
-- rnf Nil = r0
rnf Nil = ()
rnf (Cons a b) = rnf a `seq` rnf b
rnf (Fork _ b) = rnf b
-- -----------------------------------------------------------------------------
-- Stream operators
-- Is the producer supposed to generate ILists only of length chunkLength + 1, ending in
-- Nil? So that the consumer must know to append the result of the forked computation on
-- reaching Nil? That seems awkward for the consumer:
--
-- streamFromList 4 2 [0..8]
-- f0 = Cons 0 $ Cons 1 $ Fork f1 $ Cons 2 $ Cons 3 Nil
-- f1 = Cons 4 $ Cons 5 $ Fork f2 $ Cons 6 $ Cons 7 Nil
-- f2 = Cons 8 $ Nil
-- Or can the producer somehow tie the knot, so that the last IVar in the chunk is
-- actually the first IVar from the forked computation?
-- Well, note that the forked computation returns unit, not an IList or Stream. So
-- there's nothing for the consumer to get from the forked computation. Thus it must be
-- that the forked computation just fills in more of the same IList.
-- Note that chunkSize must be > 0 and 0 <= forkDistance < chunkSize
-- <<streamFromList
streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList chunkSize forkDistance xs = do
var <- new
nextChunk <- new
fork $ loop 0 xs var nextChunk
return var
where
loop :: NFData a => Int -> [a] -> Stream a -> Stream a -> Par ()
loop _ [] var _ = put var Nil
loop i (x:xs) var nextChunk =
if i == forkDistance
then do
newtl <- new
nextNextChunk <- new
let calcNextChunk = loop 0 (drop (chunkSize - forkDistance - 1) xs) nextChunk nextNextChunk
put var (Fork calcNextChunk (Cons x newtl))
loop (i+1) xs newtl nextChunk
else
if i == chunkSize - 1
then put var (Cons x nextChunk)
else do
newtl <- new
put var (Cons x newtl)
loop (i+1) xs newtl nextChunk
-- >>
-- <<streamMap
streamMap :: (NFData a, NFData b) => Int -> Int -> (a -> b) -> Stream a -> Par (Stream b)
streamMap chunkSize forkDistance fn instrm = do
outstrm <- new
nextChunk <- new
fork $ loop 0 instrm outstrm nextChunk
return outstrm
where
loop i instrm outstrm nextChunk = do
ilst <- get instrm
processElement ilst
where
processElement ilst =
case ilst of
Nil -> put outstrm Nil
Cons h t ->
if i == forkDistance
then do
newtl <- new
nextNextChunk <- new
let dropN :: NFData a => Int -> Stream a -> Par (Stream a)
dropN i str =
if i == 0 then return str
else do
el <- get str
case el of
Nil -> do
nv <- new
put nv Nil
return nv
Cons _ tl -> dropN (i-1) tl
(Fork _ tl) -> do
nt <- new
put nt tl
dropN i nt
let calcNextChunk = do
instr <- dropN (chunkSize - forkDistance - 1) t
loop 0 instr nextChunk nextNextChunk
put outstrm (Fork calcNextChunk (Cons (fn h) newtl))
loop (i+1) t newtl nextChunk
else if i == chunkSize - 1
then put outstrm (Cons (fn h) nextChunk)
else do
newtl <- new
put outstrm (Cons (fn h) newtl)
loop (i+1) t newtl nextChunk
Fork p tl -> do
fork p
processElement tl
-- >>
-- | Reduce a stream to a single value. This function will not return
-- until it reaches the end-of-stream.
-- <<streamFold
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn !acc instrm = do
ilst <- get instrm
case ilst of
Nil -> return acc
Cons h t -> streamFold fn (fn acc h) t
Fork p Nil -> return acc
Fork p (Cons h t) -> do
fork p
streamFold fn (fn acc h) t
-- >>
streamFilter :: NFData a => (a -> Bool) -> Stream a -> Par (Stream a)
streamFilter p instr = do
outstr <- new
fork $ loop instr outstr
return outstr
where
loop instr outstr = do
v <- get instr
case v of
Nil -> put outstr Nil
Cons x instr'
| p x -> do
tail <- new
put_ outstr (Cons x tail)
loop instr' tail
| otherwise -> do
loop instr' outstr
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment