Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
little streaming library
{-#LANGUAGE BangPatterns #-}
import GHC.Magic
import Data.IORef
import Control.Monad
import Control.Monad.Trans
data Stream a m r = Yield a (Stream a m r) | Done r | Delay (() -> m (Stream a m r))
instance Functor m => Functor (Stream a m) where
fmap f (Done r) = Done (f r)
fmap f (Yield a str) = Yield a (fmap f str)
fmap f (Delay g) = Delay (\() -> fmap (fmap f) (g ()))
instance Monad m => Applicative (Stream a m) where pure = return ; (<*>) = ap
instance Monad m => Monad (Stream a m) where
return = Done
(>>=) = bind_
bind_ str f = case str of
Done r -> f r
Yield a str -> Yield a (bind_ str f)
Delay g -> Delay (\() -> fmap (\s -> bind_ s f)(g ()))
instance MonadIO m => MonadIO (Stream a m) where
liftIO = liftio
{-#INLINE liftIO #-}
liftio :: MonadIO m => IO r -> Stream a m r
liftio m = Delay (oneShot (\() -> fmap Done (liftIO m)))
{-#INLINE [1] liftio #-}
{-#RULES
"liftio hack" forall m f . liftio m >>= f = Delay (oneShot (\() -> fmap f (liftIO m)))
#-}
yield :: Monad m => a -> Stream a m ()
yield a = Yield a (Done ())
next :: Monad m => Stream t m a -> m (Either a (t, Stream t m a))
next str = case str of
Done r -> return (Left r)
Yield a rest -> return (Right (a,rest))
Delay f -> f () >>= next
stream :: Int -> Int -> IORef Int -> Stream Int IO ()
stream !m limit ref = when (m < limit) $ do
n <- liftIO (readIORef ref)
yield n
stream (m+1) limit ref
counter :: IORef Int -> Stream Int IO r -> IO Int
counter ref = loop 0 where
loop n str = do
e <- next str
case e of
Left _ -> return n
Right (a,rest) -> do
writeIORef ref $! n+1
loop (n+1) rest
main = do
ref <- newIORef 0
let send = stream 0 large ref
large = 10000000 :: Int
counter ref send
readIORef ref >>= print
counter ref send
readIORef ref >>= print
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.