Skip to content

Instantly share code, notes, and snippets.

@gregorycollins
Created April 25, 2013 17:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gregorycollins/5461584 to your computer and use it in GitHub Desktop.
Save gregorycollins/5461584 to your computer and use it in GitHub Desktop.
-- | Internal implementation of the @io-streams@ library, intended for library
-- writers
--
-- Library users should use the interface provided by "System.IO.Streams"
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeSynonymInstances #-}
module System.IO.Streams.Internal2 where
------------------------------------------------------------------------------
import Control.Applicative (Applicative (..))
import Control.Concurrent (newMVar, withMVar)
import Control.Exception (throwIO)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO (..))
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString.Internal as S
import qualified Data.ByteString.Unsafe as S
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Maybe (isNothing)
import Data.Typeable (Typeable)
import Data.Word (Word8)
import Foreign.Marshal.Utils (copyBytes)
import Foreign.Ptr (castPtr)
import qualified GHC.IO.Buffer as H
import qualified GHC.IO.BufferedIO as H
import qualified GHC.IO.Device as H
import GHC.IO.Exception (unsupportedOperation)
import Prelude hiding (read)
------------------------------------------------------------------------------
-- | A strict pair type.
data SP a b = SP !a !b
deriving (Typeable)
------------------------------------------------------------------------------
data InputStream a = InputStream {
_read :: IO (Maybe a)
, _unRead :: a -> IO ()
} deriving (Typeable)
------------------------------------------------------------------------------
data OutputStream a = OutputStream {
_write :: Maybe a -> IO ()
} deriving (Typeable)
------------------------------------------------------------------------------
read :: InputStream a -> IO (Maybe a)
read = _read
{-# INLINE read #-}
------------------------------------------------------------------------------
write :: Maybe a -> OutputStream a -> IO ()
write = flip _write
{-# INLINE write #-}
------------------------------------------------------------------------------
peek :: InputStream a -> IO (Maybe a)
peek s = do
x <- read s
maybe (return $! ()) (_unRead s) x
return x
------------------------------------------------------------------------------
unRead :: a -> InputStream a -> IO ()
unRead = flip _unRead
------------------------------------------------------------------------------
-- | Connects an 'InputStream' and 'OutputStream', supplying values from the
-- 'InputStream' to the 'OutputStream', and propagating the end-of-stream
-- message from the 'InputStream' through to the 'OutputStream'.
--
-- The connection ends when the 'InputStream' yields a 'Nothing'.
connect :: InputStream a -> OutputStream a -> IO ()
connect p q = loop
where
loop = do
m <- read p
maybe (write Nothing q)
(const $ write m q >> loop)
m
{-# INLINE connect #-}
------------------------------------------------------------------------------
-- | The 'connectTo' function is just @'flip' 'connect'@.
--
-- Useful for writing expressions like @fromList [1,2,3] >>= connectTo foo@.
--
connectTo :: OutputStream a -> InputStream a -> IO ()
connectTo = flip connect
{-# INLINE connectTo #-}
------------------------------------------------------------------------------
-- | Connects an 'InputStream' to an 'OutputStream' without passing the
-- end-of-stream notification through to the 'OutputStream'.
--
-- Use this to supply an 'OutputStream' with multiple 'InputStream's and use
-- 'connect' for the final 'InputStream' to finalize the 'OutputStream', like
-- so:
--
-- @
-- do Streams.'supply' input1 output
-- Streams.'supply' input2 output
-- Streams.'connect' input3 output
-- @
--
supply :: InputStream a -> OutputStream a -> IO ()
supply p q = loop
where
loop = do
m <- read p
maybe (return $! ())
(const $ write m q >> loop)
m
{-# INLINE supply #-}
------------------------------------------------------------------------------
-- | 'supply' with the arguments flipped.
supplyTo :: OutputStream a -> InputStream a -> IO ()
supplyTo = flip supply
{-# INLINE supplyTo #-}
------------------------------------------------------------------------------
-- | Creates an 'InputStream' from a value-producing action.
--
-- (@makeInputStream m@) calls the action @m@ each time you request a value
-- from the 'InputStream'. The given action is extended with the default
-- pushback mechanism (see "System.IO.Streams.Internal#pushback").
makeInputStream :: IO (Maybe a) -> IO (InputStream a)
makeInputStream m = do
doneRef <- newIORef False
pbRef <- newIORef []
return $! InputStream (grab doneRef pbRef) (pb pbRef)
where
grab doneRef pbRef = do
l <- readIORef pbRef
case l of
[] -> do done <- readIORef doneRef
if done
then return Nothing
else do
x <- m
when (isNothing x) $ writeIORef doneRef True
return x
(x:xs) -> writeIORef pbRef xs >> (return $! Just x)
pb ref x = readIORef ref >>= \xs -> writeIORef ref (x:xs)
{-# INLINE makeInputStream #-}
------------------------------------------------------------------------------
-- | Creates an 'OutputStream' from a value-consuming action.
--
-- (@makeOutputStream f@) runs the computation @f@ on each value fed to it.
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream = return . OutputStream
------------------------------------------------------------------------------
-- | Converts an 'InputStream' into a thread-safe 'InputStream', at a slight
-- performance penalty.
--
-- For performance reasons, this library provides non-thread-safe streams by
-- default. Use the @locking@ functions to convert these streams into slightly
-- slower, but thread-safe, equivalents.
lockingInputStream :: InputStream a -> IO (InputStream a)
lockingInputStream s = do
mv <- newMVar $! ()
return $! InputStream (grab mv) (pb mv)
where
grab mv = withMVar mv $ const $ read s
pb mv x = withMVar mv $ const $ unRead x s
{-# INLINE lockingInputStream #-}
------------------------------------------------------------------------------
-- | Converts an 'OutputStream' into a thread-safe 'OutputStream', at a slight
-- performance penalty.
--
-- For performance reasons, this library provides non-thread-safe streams by
-- default. Use the @locking@ functions to convert these streams into slightly
-- slower, but thread-safe, equivalents.
lockingOutputStream :: OutputStream a -> IO (OutputStream a)
lockingOutputStream s = do
mv <- newMVar $! ()
makeOutputStream $ f mv
where
f mv x = withMVar mv $ const $ write x s
{-# INLINE lockingOutputStream #-}
------------------------------------------------------------------------------
-- | An empty 'InputStream' that yields 'Nothing' immediately.
nullInput :: IO (InputStream a)
nullInput = makeInputStream $ return Nothing
------------------------------------------------------------------------------
-- | An empty 'OutputStream' that discards any input fed to it.
nullOutput :: IO (OutputStream a)
nullOutput = makeOutputStream $ const $ return $! ()
------------------------------------------------------------------------------
-- | 'appendInputStream' concatenates two 'InputStream's, analogous to ('++')
-- for lists.
--
-- The second 'InputStream' continues where the first 'InputStream' ends.
--
-- Note: values pushed back to 'appendInputStream' are not propagated to either
-- wrapped 'InputStream'.
appendInputStream :: InputStream a -> InputStream a -> IO (InputStream a)
appendInputStream s1 s2 = concatInputStreams [s1, s2]
------------------------------------------------------------------------------
-- | 'concatInputStreams' concatenates a list of 'InputStream's, analogous to
-- ('++') for lists.
--
-- Subsequent 'InputStream's continue where the previous one 'InputStream'
-- ends.
--
-- Note: values pushed back to the 'InputStream' returned by
-- 'concatInputStreams' are not propagated to any of the source
-- 'InputStream's.
concatInputStreams :: [InputStream a] -> IO (InputStream a)
concatInputStreams inputStreams = do
ref <- newIORef inputStreams
makeInputStream $! run ref
where
run ref = go
where
go = do
streams <- readIORef ref
case streams of
[] -> return Nothing
(s:rest) -> do
next <- read s
case next of
Nothing -> writeIORef ref rest >> go
Just _ -> return next
------------------------------------------------------------------------------
-- | Checks if an 'InputStream' is at end-of-stream.
atEOF :: InputStream a -> IO Bool
atEOF s = read s >>= maybe (return True) (\k -> unRead k s >> return False)
------------------------------------------------------------------------------
-- $pushback
-- #pushback#
--
-- Users can push a value back into an input stream using the 'unRead'
-- function. Usually this will use the default pushback mechanism which
-- provides a buffer for the stream. Some stream transformers, like
-- 'takeBytes', produce streams that send pushed-back values back to the
-- streams that they wrap. A function like 'System.IO.Streams.Combinators.map'
-- cannot do this because the types don't match up:
--
-- @
-- 'System.IO.Streams.Combinators.map' :: (a -> b) -> 'InputStream' a -> 'IO' ('InputStream' b)
-- @
--
-- A function will usually document if its pushback behaviour differs from the
-- default. No matter what the case, input streams should obey the following
-- law:
--
-- @
-- Streams.'unRead' c stream >> Streams.'read' stream === 'return' ('Just' c)
-- @
--------------------------------------------
-- Typeclass instances for Handle support --
--------------------------------------------
------------------------------------------------------------------------------
bUFSIZ :: Int
bUFSIZ = 32752
------------------------------------------------------------------------------
unsupported :: IO a
unsupported = throwIO unsupportedOperation
------------------------------------------------------------------------------
bufferToBS :: H.Buffer Word8 -> ByteString
bufferToBS buf = S.copy $! S.fromForeignPtr raw l sz
where
raw = H.bufRaw buf
l = H.bufL buf
r = H.bufR buf
sz = r - l
------------------------------------------------------------------------------
instance H.RawIO (InputStream ByteString) where
read is ptr n = read is >>= maybe (return 0) f
where
f s = S.unsafeUseAsCStringLen s $ \(cstr, l) -> do
let c = min n l
copyBytes ptr (castPtr cstr) c
return $! c
readNonBlocking _ _ _ = unsupported
write _ _ _ = unsupported
writeNonBlocking _ _ _ = unsupported
------------------------------------------------------------------------------
instance H.RawIO (OutputStream ByteString) where
read _ _ _ = unsupported
readNonBlocking _ _ _ = unsupported
write os ptr n = S.packCStringLen (castPtr ptr, n) >>=
flip write os . Just
writeNonBlocking _ _ _ = unsupported
------------------------------------------------------------------------------
-- | Internal convenience synonym for a pair of input\/output streams.
type StreamPair a = SP (InputStream a) (OutputStream a)
instance H.RawIO (StreamPair ByteString) where
read (SP is _) ptr n = H.read is ptr n
readNonBlocking _ _ _ = unsupported
write (SP _ os) ptr n = H.write os ptr n
writeNonBlocking _ _ _ = unsupported
------------------------------------------------------------------------------
instance H.BufferedIO (OutputStream ByteString) where
newBuffer !_ bs = H.newByteBuffer bUFSIZ bs
fillReadBuffer !_ _ = unsupported
fillReadBuffer0 !_ _ = unsupported
flushWriteBuffer !os !buf = do
write (Just $! bufferToBS buf) os
emptyWriteBuffer buf
flushWriteBuffer0 !os !buf = do
let s = bufferToBS buf
let l = S.length s
write (Just s) os
buf' <- emptyWriteBuffer buf
return $! (l, buf')
------------------------------------------------------------------------------
instance H.BufferedIO (InputStream ByteString) where
newBuffer !_ !bs = H.newByteBuffer bUFSIZ bs
fillReadBuffer !is !buf = H.readBuf is buf
fillReadBuffer0 _ _ = unsupported
flushWriteBuffer _ _ = unsupported
flushWriteBuffer0 _ _ = unsupported
------------------------------------------------------------------------------
instance H.BufferedIO (StreamPair ByteString) where
newBuffer !_ bs = H.newByteBuffer bUFSIZ bs
fillReadBuffer (SP is _) = H.fillReadBuffer is
fillReadBuffer0 _ _ = unsupported
flushWriteBuffer (SP _ !os) = H.flushWriteBuffer os
flushWriteBuffer0 (SP _ !os) = H.flushWriteBuffer0 os
------------------------------------------------------------------------------
instance H.IODevice (OutputStream ByteString) where
ready _ _ _ = return True
close = write Nothing
devType _ = return H.Stream
------------------------------------------------------------------------------
instance H.IODevice (InputStream ByteString) where
ready _ _ _ = return True
close _ = return $! ()
devType _ = return H.Stream
------------------------------------------------------------------------------
instance H.IODevice (StreamPair ByteString) where
ready _ _ _ = return True
close (SP _ os) = write Nothing os
devType _ = return H.Stream
------------------------------------------------------------------------------
emptyWriteBuffer :: H.Buffer Word8
-> IO (H.Buffer Word8)
emptyWriteBuffer buf
= return buf { H.bufL=0, H.bufR=0, H.bufState = H.WriteBuffer }
------------------------------------------------------------------------------
-- | A 'Generator' is a coroutine monad that can be used to define complex
-- 'InputStream's. You can cause a value of type @Just r@ to appear when the
-- 'InputStream' is read by calling 'yield':
--
-- @
-- g :: 'Generator' Int ()
-- g = do
-- Streams.'yield' 1
-- Streams.'yield' 2
-- Streams.'yield' 3
-- @
--
-- A 'Generator' can be turned into an 'InputStream' by calling
-- 'fromGenerator':
--
-- @
-- m :: 'IO' ['Int']
-- m = Streams.'fromGenerator' g >>= Streams.'System.IO.Streams.toList' \-\- value returned is [1,2,3]
-- @
--
-- You can perform IO by calling 'liftIO', and turn a 'Generator' into an
-- 'InputStream' with 'fromGenerator'.
--
-- As a general rule, you should not acquire resources that need to be freed
-- from a 'Generator', because there is no guarantee the coroutine continuation
-- will ever be called, nor can you catch an exception from within a
-- 'Generator'.
newtype Generator r a = Generator {
unG :: IO (Either (SP r (Generator r a)) a)
} deriving (Typeable)
------------------------------------------------------------------------------
generatorBind :: Generator r a -> (a -> Generator r b) -> Generator r b
generatorBind (Generator m) f = Generator (m >>= either step value)
where
step (SP v r) = return $! Left $! SP v (generatorBind r f)
value = unG . f
{-# INLINE generatorBind #-}
------------------------------------------------------------------------------
instance Monad (Generator r) where
return = Generator . return . Right
(>>=) = generatorBind
------------------------------------------------------------------------------
instance MonadIO (Generator r) where
liftIO = Generator . (Right `fmap`)
------------------------------------------------------------------------------
instance Functor (Generator r) where
fmap f (Generator m) = Generator $ m >>= either step value
where
step (SP v m') = return $! Left $! SP v (fmap f m')
value v = return $! Right $! f v
------------------------------------------------------------------------------
instance Applicative (Generator r) where
pure = Generator . return . Right
m <*> n = do
f <- m
v <- n
return $! f v
------------------------------------------------------------------------------
-- | Calling @'yield' x@ causes the value @'Just' x@ to appear on the input
-- when this generator is converted to an 'InputStream'. The rest of the
-- computation after the call to 'yield' is resumed later when the
-- 'InputStream' is 'read' again.
yield :: r -> Generator r ()
yield x = Generator $! return $! Left $! SP x (return $! ())
------------------------------------------------------------------------------
-- | Turns a 'Generator' into an 'InputStream'.
fromGenerator :: Generator r a -> IO (InputStream r)
fromGenerator (Generator m) = do
ref <- newIORef m
makeInputStream $! go ref
where
go ref = readIORef ref >>= (\n -> n >>= either step finish)
where
step (SP v gen) = do
writeIORef ref $! unG gen
return $! Just v
finish _ = return Nothing
------------------------------------------------------------------------------
newtype Consumer c a = Consumer {
unC :: IO (Either (Maybe c -> Consumer c a) a)
} deriving (Typeable)
------------------------------------------------------------------------------
instance Monad (Consumer c) where
return = Consumer . return . Right
(Consumer m) >>= f = Consumer $ m >>= either step value
where
step g = return $! Left $! (>>= f) . g
value v = unC $ f v
------------------------------------------------------------------------------
instance MonadIO (Consumer c) where
liftIO = Consumer . fmap Right
------------------------------------------------------------------------------
instance Functor (Consumer r) where
fmap f (Consumer m) = Consumer (m >>= either step value)
where
step g = return $! Left $! (fmap f) . g
value v = return $! Right $! f v
------------------------------------------------------------------------------
instance Applicative (Consumer r) where
pure = return
m <*> n = do
f <- m
v <- n
return $! f v
------------------------------------------------------------------------------
await :: Consumer r (Maybe r)
await = Consumer $ return (Left return)
------------------------------------------------------------------------------
fromConsumer :: Consumer r a -> IO (OutputStream r)
fromConsumer c0 = newIORef c0 >>= makeOutputStream . go
where
go ref mb = do
c <- readIORef ref
c' <- unC c >>= either step (const $! return c)
writeIORef ref c'
where
force c = do e <- unC c
return $! Consumer $! return e
step g = force $! g mb
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment