Skip to content

Instantly share code, notes, and snippets.

@dcoutts
Created June 21, 2016 16:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcoutts/798812e040a61ad969c27a45549943c0 to your computer and use it in GitHub Desktop.
Save dcoutts/798812e040a61ad969c27a45549943c0 to your computer and use it in GitHub Desktop.
Two different approaches to incremental CBOR sequence files
-- | Pure lazy API on top of CBOR
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module CBOR.Lazy (
-- * Encoding
EncodedElems(..)
, encodeElems
-- * Decoding
, DecodeLazy(..)
, DecoderState(..)
, initDecoderState
, decodeLazy
, decodeAllLazy
) where
import Prelude hiding (pred)
import Data.Binary.Get (Decoder(..))
import Data.Binary.Serialise.CBOR (Serialise)
import Data.Monoid
import qualified Data.Binary.Serialise.CBOR as CBOR (decode, encode)
import qualified Data.Binary.Serialise.CBOR.Decoding as CBOR
import qualified Data.Binary.Serialise.CBOR.Encoding as CBOR
import qualified Data.Binary.Serialise.CBOR.Read as CBOR
import qualified Data.ByteString as BS.S
import qualified Data.ByteString.Lazy as BS.L
{-------------------------------------------------------------------------------
Lazy encoding of lists
-------------------------------------------------------------------------------}
-- | Encoded list elements
--
-- CBOR supports lists of indefinite length and encodes them as
--
-- > <LIST_BEGIN> x1 x2 .. xn <BREAK>
--
-- In priniple, this representation is perfectly suitable for both lazy encoding
-- and lazy decoding. Moreover, the standard 'encode' instance for lists in the
-- @binary-serialise-cbor@ library does in fact use this representation
-- (except for empty lists, where it uses a length prefix).
--
-- We do have a problem, however, which is that this encoding does not
-- distribute over concatenation:
--
-- > encode (xs <> ys) /= encode xs <> encode ys
--
-- since these will look like
--
-- > <LIST_BEGIN> x1 x2 .. xn y1 y2 .. yn <BREAK>
-- > <LIST_BEGIN> x1 x2 .. xn <BREAK> <LIST_BEGIN> y1 y2 .. yn <BREAK>
--
-- respectively.
--
-- 'EncodedElems' is the encoding of /just/ the elements of the list,
-- so that 'encodeElems' /does/ distribute over concatenation:
--
-- > encodeElems (xs ++ ys) = encodeElems xs <> encodeElems ys
--
-- allowing us to encode fragments of the list at a time.
newtype EncodedElems a = EncodedElems CBOR.Encoding
deriving (Monoid)
-- | Encode list elements
--
-- See 'EncodedListElements' for a detailed discussion.
encodeElems :: Serialise a => [a] -> EncodedElems a
encodeElems = EncodedElems . foldr (\x r -> CBOR.encode x <> r) mempty
{-------------------------------------------------------------------------------
Lazy decoding of lists
-------------------------------------------------------------------------------}
data DecodeLazy a =
-- | We found an element
Elem a (DecodeLazy a)
-- | Waiting for more input
| Suspended (DecoderState a)
-- | We encountered an error
| Failed String
-- | We reached the end of the list (possibly with trailing data)
| End BS.L.ByteString
data DecoderState a =
-- | We're still waiting to skip the list header
SkipHeader (Decoder (Maybe Int))
-- | We're waiting for more input until we can decode the next element
-- (or know that we have reached the element of the list)
--
-- We may or may not know many elements we are still expecting.
| NextElem (Decoder (Maybe a)) (Maybe Int)
initDecoderState :: DecoderState a
initDecoderState = SkipHeader decodeHeader
where
decodeHeader :: Decoder (Maybe Int)
decodeHeader = CBOR.deserialiseIncremental CBOR.decodeListLenOrIndef
-- | If we have a complete bytestring, lazily decode it.
--
-- Any errors will be thrown.
decodeAllLazy :: Serialise a => BS.L.ByteString -> [a]
decodeAllLazy = go . decodeLazy initDecoderState
where
go :: DecodeLazy a -> [a]
go (Elem a as) = a : go as
go (End _leftover) = []
go (Suspended _) = error $ "decodeAllLazy: list not terminated"
go (Failed err) = error $ "decodeAllLazy: " ++ err
decodeLazy :: forall a. Serialise a
=> DecoderState a
-> BS.L.ByteString
-> DecodeLazy a
decodeLazy st = \bs ->
case st of
SkipHeader d -> skipHeader d $ BS.L.toChunks bs
NextElem d nm -> nextElem d nm $ BS.L.toChunks bs
where
skipHeader :: Decoder (Maybe Int) -> [BS.S.ByteString] -> DecodeLazy a
skipHeader (Done bs _ mn) bss = nextElem decodeElem mn (cons bs bss)
skipHeader d@(Partial _) [] = Suspended $ SkipHeader d
skipHeader (Partial k) (bs:bss) = skipHeader (k (Just bs)) bss
skipHeader (Fail _ _ err) _ = Failed err
nextElem :: Decoder (Maybe a) -> Maybe Int -> [BS.S.ByteString] -> DecodeLazy a
nextElem _ (Just 0) bss = End $ BS.L.fromChunks bss
nextElem d@(Partial _) mn [] = Suspended $ NextElem d mn
nextElem (Partial k) mn (bs:bss) = nextElem (k (Just bs)) mn bss
nextElem (Fail _ _ err) _ _ = Failed err
nextElem (Done bs _ ma) mn bss =
let bss' = cons bs bss in
case ma of
Nothing -> End $ BS.L.fromChunks bss'
Just a -> Elem a $ nextElem decodeElem (pred mn) bss'
-- Decoder for a single element
--
-- Returns 'Nothing' if we have reached the end of the list
decodeElem :: Decoder (Maybe a)
decodeElem = CBOR.deserialiseIncremental $ do
stop <- CBOR.decodeBreakOr
if stop then return Nothing
else Just <$> CBOR.decode
cons :: BS.S.ByteString -> [BS.S.ByteString] -> [BS.S.ByteString]
cons bs bss | BS.S.null bs = bss
| otherwise = bs : bss
pred :: Maybe Int -> Maybe Int
pred Nothing = Nothing
pred (Just n) = Just (n - 1)
{-# LANGUAGE ScopedTypeVariables #-}
module SequenceFiles (
writeBinaryFileSequence,
appendBinaryFileSequence,
hPutBinaryFileSequence,
readBinaryFileSequenceLazy,
withBinaryFileSequenceLazy,
withBinaryFileSequence,
) where
import Data.Monoid
--import qualified Data.Binary as B
--import qualified Data.Binary.Put as B
import qualified Data.Binary.Serialise.CBOR as B
import qualified Data.Binary.Serialise.CBOR.Write as B
import qualified Data.Binary.Get as B
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Builder.Extra as BS
import Data.IORef
import Foreign
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
import System.IO
import System.IO.Error
import Control.Exception
-- | Write a file consisting of a sequence of items. The items are written out
-- incrementally.
--
-- The file format is simply the binary-encoded items back to back.
--
writeBinaryFileSequence :: B.Serialise a => FilePath -> [a] -> IO ()
writeBinaryFileSequence file xs =
withFile file WriteMode $ \hnd ->
hPutBinaryFileSequence hnd xs
-- | Write a file consisting of a sequence of items. The items are written out
-- incrementally.
--
-- The file format is simply the binary-encoded items back to back.
--
appendBinaryFileSequence :: B.Serialise a => FilePath -> [a] -> IO ()
appendBinaryFileSequence file xs =
withFile file AppendMode $ \hnd ->
hPutBinaryFileSequence hnd xs
hPutBinaryFileSequence :: B.Serialise a => Handle -> [a] -> IO ()
hPutBinaryFileSequence hnd xs = do
buf <- newBuffer BS.defaultChunkSize
hPutBinaryFileSequenceWithBuffer hnd buf xs
hPutBinaryFileSequenceWithBuffer :: B.Serialise a
=> Handle -> Buffer -> [a] -> IO ()
hPutBinaryFileSequenceWithBuffer hnd buf0 =
go buf0 . BS.runBuilder . B.toBuilder
. foldr (\x r -> B.encode x <> r) mempty
where
go :: Buffer -> BS.BufferWriter -> IO ()
go buf write = do
next <- withBuffer buf $ \ptr sz -> do
-- run the builder, writing into our buffer
(n, next) <- write ptr sz
-- so now our buffer contains 'n' bytes
-- write it all out to the handle leaving our buffer empty
hPutBuf hnd ptr n
return next
case next of
BS.Done -> return ()
BS.More minSize write' | bufferSize buf < minSize -> do
-- very unlikely given our strategy of flushing our buffer every time
buf' <- newBuffer minSize
go buf' write'
BS.More _minSize write' ->
go buf write'
BS.Chunk chunk write' -> do
BS.hPut hnd chunk
go buf write'
data Buffer = Buffer {-# UNPACK #-} !(ForeignPtr Word8) {-# UNPACK #-} !Int
bufferSize :: Buffer -> Int
bufferSize (Buffer _fptr len) = len
newBuffer :: Int -> IO Buffer
newBuffer len = do
fptr <- mallocPlainForeignPtrBytes len
return $! Buffer fptr len
withBuffer :: Buffer -> (Ptr Word8 -> Int -> IO a) -> IO a
withBuffer (Buffer fptr len) action =
withForeignPtr fptr $ \ptr -> action ptr len
-- | Read a file consisting of a sequence of items. The items are read in
-- incrementally. The body action is given an action that can be called
-- repeatedly to get each item. It eventually returns @Nothing@.
--
-- The file format is that used by 'writeBinaryFileSequence'.
--
withBinaryFileSequence :: forall a b. B.Serialise a
=> FilePath
-> (IO (Maybe a) -> IO b)
-> IO b
withBinaryFileSequence file action = do
trailingRef <- newIORef BS.empty
withFile file ReadMode $ \hnd ->
action (readNextChunk hnd trailingRef)
where
readNextChunk :: Handle -> IORef BS.ByteString -> IO (Maybe a)
readNextChunk hnd trailingRef = do
initial <- do
trailing <- readIORef trailingRef
if BS.null trailing
then BS.hGetSome hnd BS.defaultChunkSize
else return trailing
if BS.null initial
then return Nothing
else Just <$> go hnd trailingRef (B.pushChunk initialDecoder initial)
where
initialDecoder :: B.Decoder a
initialDecoder = B.deserialiseIncremental
go :: Handle -> IORef BS.ByteString -> B.Decoder a -> IO a
go hnd trailingRef (B.Partial k) = do
chunk <- BS.hGetSome hnd BS.defaultChunkSize
go hnd trailingRef (k (if BS.null chunk then Nothing else Just chunk))
go _ trailingRef (B.Done trailing _ x) = do
writeIORef trailingRef trailing
return x
go hnd _ (B.Fail _ _ msg) =
ioError $ mkIOError userErrorType msg (Just hnd) Nothing
-- | Read a file consisting of a sequence of items. The items are read in
-- incrementally, lazily. The body action is given the lazy list of items.
-- These must be consumed within the body action.
--
-- The file format is that used by 'writeBinaryFileSequence'.
--
withBinaryFileSequenceLazy :: forall a b. B.Serialise a
=> FilePath -> ([a] -> IO b) -> IO b
withBinaryFileSequenceLazy file action = do
withFile file ReadMode $ \hnd ->
action . decodeSequence file =<< LBS.hGetContents hnd
-- | Read a file consisting of a sequence of items. The items are read in
-- incrementally, lazily.
--
-- The file format is that used by 'writeBinaryFileSequence'.
--
readBinaryFileSequenceLazy :: forall a. B.Serialise a => FilePath -> IO [a]
readBinaryFileSequenceLazy file =
decodeSequence file <$> LBS.readFile file
decodeSequence :: forall a. B.Serialise a => FilePath -> LBS.ByteString -> [a]
decodeSequence file =
go initialDecoder . LBS.toChunks
where
initialDecoder :: B.Decoder a
initialDecoder = B.deserialiseIncremental
go :: B.Decoder a -> [BS.ByteString] -> [a]
go (B.Partial k) [] = go (k Nothing) []
go (B.Partial k) (chunk:chunks) = go (k (Just chunk)) chunks
go (B.Done trailing _ x) []
| BS.null trailing = x : []
go (B.Done trailing _ x) chunks = x : go initialDecoder (trailing : chunks)
go (B.Fail _ _ msg) _ = throw ioerr
where ioerr = mkIOError userErrorType msg Nothing (Just file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment