Created
June 21, 2016 16:38
-
-
Save dcoutts/798812e040a61ad969c27a45549943c0 to your computer and use it in GitHub Desktop.
Two different approaches to incremental CBOR sequence files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | Pure lazy API on top of CBOR | |
{-# LANGUAGE BangPatterns #-} | |
{-# LANGUAGE GeneralizedNewtypeDeriving #-} | |
module CBOR.Lazy ( | |
-- * Encoding | |
EncodedElems(..) | |
, encodeElems | |
-- * Decoding | |
, DecodeLazy(..) | |
, DecoderState(..) | |
, initDecoderState | |
, decodeLazy | |
, decodeAllLazy | |
) where | |
import Prelude hiding (pred) | |
import Data.Binary.Get (Decoder(..)) | |
import Data.Binary.Serialise.CBOR (Serialise) | |
import Data.Monoid | |
import qualified Data.Binary.Serialise.CBOR as CBOR (decode, encode) | |
import qualified Data.Binary.Serialise.CBOR.Decoding as CBOR | |
import qualified Data.Binary.Serialise.CBOR.Encoding as CBOR | |
import qualified Data.Binary.Serialise.CBOR.Read as CBOR | |
import qualified Data.ByteString as BS.S | |
import qualified Data.ByteString.Lazy as BS.L | |
{------------------------------------------------------------------------------- | |
Lazy encoding of lists | |
-------------------------------------------------------------------------------} | |
-- | Encoded list elements | |
-- | |
-- CBOR supports lists of indefinite length and encodes them as | |
-- | |
-- > <LIST_BEGIN> x1 x2 .. xn <BREAK> | |
-- | |
-- In priniple, this representation is perfectly suitable for both lazy encoding | |
-- and lazy decoding. Moreover, the standard 'encode' instance for lists in the | |
-- @binary-serialise-cbor@ library does in fact use this representation | |
-- (except for empty lists, where it uses a length prefix). | |
-- | |
-- We do have a problem, however, which is that this encoding does not | |
-- distribute over concatenation: | |
-- | |
-- > encode (xs <> ys) /= encode xs <> encode ys | |
-- | |
-- since these will look like | |
-- | |
-- > <LIST_BEGIN> x1 x2 .. xn y1 y2 .. yn <BREAK> | |
-- > <LIST_BEGIN> x1 x2 .. xn <BREAK> <LIST_BEGIN> y1 y2 .. yn <BREAK> | |
-- | |
-- respectively. | |
-- | |
-- 'EncodedElems' is the encoding of /just/ the elements of the list, | |
-- so that 'encodeElems' /does/ distribute over concatenation: | |
-- | |
-- > encodeElems (xs ++ ys) = encodeElems xs <> encodeElems ys | |
-- | |
-- allowing us to encode fragments of the list at a time. | |
newtype EncodedElems a = EncodedElems CBOR.Encoding | |
deriving (Monoid) | |
-- | Encode list elements | |
-- | |
-- See 'EncodedListElements' for a detailed discussion. | |
encodeElems :: Serialise a => [a] -> EncodedElems a | |
encodeElems = EncodedElems . foldr (\x r -> CBOR.encode x <> r) mempty | |
{------------------------------------------------------------------------------- | |
Lazy decoding of lists | |
-------------------------------------------------------------------------------} | |
data DecodeLazy a = | |
-- | We found an element | |
Elem a (DecodeLazy a) | |
-- | Waiting for more input | |
| Suspended (DecoderState a) | |
-- | We encountered an error | |
| Failed String | |
-- | We reached the end of the list (possibly with trailing data) | |
| End BS.L.ByteString | |
data DecoderState a = | |
-- | We're still waiting to skip the list header | |
SkipHeader (Decoder (Maybe Int)) | |
-- | We're waiting for more input until we can decode the next element | |
-- (or know that we have reached the element of the list) | |
-- | |
-- We may or may not know many elements we are still expecting. | |
| NextElem (Decoder (Maybe a)) (Maybe Int) | |
initDecoderState :: DecoderState a | |
initDecoderState = SkipHeader decodeHeader | |
where | |
decodeHeader :: Decoder (Maybe Int) | |
decodeHeader = CBOR.deserialiseIncremental CBOR.decodeListLenOrIndef | |
-- | If we have a complete bytestring, lazily decode it. | |
-- | |
-- Any errors will be thrown. | |
decodeAllLazy :: Serialise a => BS.L.ByteString -> [a] | |
decodeAllLazy = go . decodeLazy initDecoderState | |
where | |
go :: DecodeLazy a -> [a] | |
go (Elem a as) = a : go as | |
go (End _leftover) = [] | |
go (Suspended _) = error $ "decodeAllLazy: list not terminated" | |
go (Failed err) = error $ "decodeAllLazy: " ++ err | |
decodeLazy :: forall a. Serialise a | |
=> DecoderState a | |
-> BS.L.ByteString | |
-> DecodeLazy a | |
decodeLazy st = \bs -> | |
case st of | |
SkipHeader d -> skipHeader d $ BS.L.toChunks bs | |
NextElem d nm -> nextElem d nm $ BS.L.toChunks bs | |
where | |
skipHeader :: Decoder (Maybe Int) -> [BS.S.ByteString] -> DecodeLazy a | |
skipHeader (Done bs _ mn) bss = nextElem decodeElem mn (cons bs bss) | |
skipHeader d@(Partial _) [] = Suspended $ SkipHeader d | |
skipHeader (Partial k) (bs:bss) = skipHeader (k (Just bs)) bss | |
skipHeader (Fail _ _ err) _ = Failed err | |
nextElem :: Decoder (Maybe a) -> Maybe Int -> [BS.S.ByteString] -> DecodeLazy a | |
nextElem _ (Just 0) bss = End $ BS.L.fromChunks bss | |
nextElem d@(Partial _) mn [] = Suspended $ NextElem d mn | |
nextElem (Partial k) mn (bs:bss) = nextElem (k (Just bs)) mn bss | |
nextElem (Fail _ _ err) _ _ = Failed err | |
nextElem (Done bs _ ma) mn bss = | |
let bss' = cons bs bss in | |
case ma of | |
Nothing -> End $ BS.L.fromChunks bss' | |
Just a -> Elem a $ nextElem decodeElem (pred mn) bss' | |
-- Decoder for a single element | |
-- | |
-- Returns 'Nothing' if we have reached the end of the list | |
decodeElem :: Decoder (Maybe a) | |
decodeElem = CBOR.deserialiseIncremental $ do | |
stop <- CBOR.decodeBreakOr | |
if stop then return Nothing | |
else Just <$> CBOR.decode | |
cons :: BS.S.ByteString -> [BS.S.ByteString] -> [BS.S.ByteString] | |
cons bs bss | BS.S.null bs = bss | |
| otherwise = bs : bss | |
pred :: Maybe Int -> Maybe Int | |
pred Nothing = Nothing | |
pred (Just n) = Just (n - 1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ScopedTypeVariables #-} | |
module SequenceFiles ( | |
writeBinaryFileSequence, | |
appendBinaryFileSequence, | |
hPutBinaryFileSequence, | |
readBinaryFileSequenceLazy, | |
withBinaryFileSequenceLazy, | |
withBinaryFileSequence, | |
) where | |
import Data.Monoid | |
--import qualified Data.Binary as B | |
--import qualified Data.Binary.Put as B | |
import qualified Data.Binary.Serialise.CBOR as B | |
import qualified Data.Binary.Serialise.CBOR.Write as B | |
import qualified Data.Binary.Get as B | |
import qualified Data.ByteString as BS | |
import qualified Data.ByteString.Lazy as LBS | |
import qualified Data.ByteString.Builder.Extra as BS | |
import Data.IORef | |
import Foreign | |
import GHC.ForeignPtr (mallocPlainForeignPtrBytes) | |
import System.IO | |
import System.IO.Error | |
import Control.Exception | |
-- | Write a file consisting of a sequence of items. The items are written out | |
-- incrementally. | |
-- | |
-- The file format is simply the binary-encoded items back to back. | |
-- | |
writeBinaryFileSequence :: B.Serialise a => FilePath -> [a] -> IO () | |
writeBinaryFileSequence file xs = | |
withFile file WriteMode $ \hnd -> | |
hPutBinaryFileSequence hnd xs | |
-- | Write a file consisting of a sequence of items. The items are written out | |
-- incrementally. | |
-- | |
-- The file format is simply the binary-encoded items back to back. | |
-- | |
appendBinaryFileSequence :: B.Serialise a => FilePath -> [a] -> IO () | |
appendBinaryFileSequence file xs = | |
withFile file AppendMode $ \hnd -> | |
hPutBinaryFileSequence hnd xs | |
hPutBinaryFileSequence :: B.Serialise a => Handle -> [a] -> IO () | |
hPutBinaryFileSequence hnd xs = do | |
buf <- newBuffer BS.defaultChunkSize | |
hPutBinaryFileSequenceWithBuffer hnd buf xs | |
hPutBinaryFileSequenceWithBuffer :: B.Serialise a | |
=> Handle -> Buffer -> [a] -> IO () | |
hPutBinaryFileSequenceWithBuffer hnd buf0 = | |
go buf0 . BS.runBuilder . B.toBuilder | |
. foldr (\x r -> B.encode x <> r) mempty | |
where | |
go :: Buffer -> BS.BufferWriter -> IO () | |
go buf write = do | |
next <- withBuffer buf $ \ptr sz -> do | |
-- run the builder, writing into our buffer | |
(n, next) <- write ptr sz | |
-- so now our buffer contains 'n' bytes | |
-- write it all out to the handle leaving our buffer empty | |
hPutBuf hnd ptr n | |
return next | |
case next of | |
BS.Done -> return () | |
BS.More minSize write' | bufferSize buf < minSize -> do | |
-- very unlikely given our strategy of flushing our buffer every time | |
buf' <- newBuffer minSize | |
go buf' write' | |
BS.More _minSize write' -> | |
go buf write' | |
BS.Chunk chunk write' -> do | |
BS.hPut hnd chunk | |
go buf write' | |
data Buffer = Buffer {-# UNPACK #-} !(ForeignPtr Word8) {-# UNPACK #-} !Int | |
bufferSize :: Buffer -> Int | |
bufferSize (Buffer _fptr len) = len | |
newBuffer :: Int -> IO Buffer | |
newBuffer len = do | |
fptr <- mallocPlainForeignPtrBytes len | |
return $! Buffer fptr len | |
withBuffer :: Buffer -> (Ptr Word8 -> Int -> IO a) -> IO a | |
withBuffer (Buffer fptr len) action = | |
withForeignPtr fptr $ \ptr -> action ptr len | |
-- | Read a file consisting of a sequence of items. The items are read in | |
-- incrementally. The body action is given an action that can be called | |
-- repeatedly to get each item. It eventually returns @Nothing@. | |
-- | |
-- The file format is that used by 'writeBinaryFileSequence'. | |
-- | |
withBinaryFileSequence :: forall a b. B.Serialise a | |
=> FilePath | |
-> (IO (Maybe a) -> IO b) | |
-> IO b | |
withBinaryFileSequence file action = do | |
trailingRef <- newIORef BS.empty | |
withFile file ReadMode $ \hnd -> | |
action (readNextChunk hnd trailingRef) | |
where | |
readNextChunk :: Handle -> IORef BS.ByteString -> IO (Maybe a) | |
readNextChunk hnd trailingRef = do | |
initial <- do | |
trailing <- readIORef trailingRef | |
if BS.null trailing | |
then BS.hGetSome hnd BS.defaultChunkSize | |
else return trailing | |
if BS.null initial | |
then return Nothing | |
else Just <$> go hnd trailingRef (B.pushChunk initialDecoder initial) | |
where | |
initialDecoder :: B.Decoder a | |
initialDecoder = B.deserialiseIncremental | |
go :: Handle -> IORef BS.ByteString -> B.Decoder a -> IO a | |
go hnd trailingRef (B.Partial k) = do | |
chunk <- BS.hGetSome hnd BS.defaultChunkSize | |
go hnd trailingRef (k (if BS.null chunk then Nothing else Just chunk)) | |
go _ trailingRef (B.Done trailing _ x) = do | |
writeIORef trailingRef trailing | |
return x | |
go hnd _ (B.Fail _ _ msg) = | |
ioError $ mkIOError userErrorType msg (Just hnd) Nothing | |
-- | Read a file consisting of a sequence of items. The items are read in | |
-- incrementally, lazily. The body action is given the lazy list of items. | |
-- These must be consumed within the body action. | |
-- | |
-- The file format is that used by 'writeBinaryFileSequence'. | |
-- | |
withBinaryFileSequenceLazy :: forall a b. B.Serialise a | |
=> FilePath -> ([a] -> IO b) -> IO b | |
withBinaryFileSequenceLazy file action = do | |
withFile file ReadMode $ \hnd -> | |
action . decodeSequence file =<< LBS.hGetContents hnd | |
-- | Read a file consisting of a sequence of items. The items are read in | |
-- incrementally, lazily. | |
-- | |
-- The file format is that used by 'writeBinaryFileSequence'. | |
-- | |
readBinaryFileSequenceLazy :: forall a. B.Serialise a => FilePath -> IO [a] | |
readBinaryFileSequenceLazy file = | |
decodeSequence file <$> LBS.readFile file | |
decodeSequence :: forall a. B.Serialise a => FilePath -> LBS.ByteString -> [a] | |
decodeSequence file = | |
go initialDecoder . LBS.toChunks | |
where | |
initialDecoder :: B.Decoder a | |
initialDecoder = B.deserialiseIncremental | |
go :: B.Decoder a -> [BS.ByteString] -> [a] | |
go (B.Partial k) [] = go (k Nothing) [] | |
go (B.Partial k) (chunk:chunks) = go (k (Just chunk)) chunks | |
go (B.Done trailing _ x) [] | |
| BS.null trailing = x : [] | |
go (B.Done trailing _ x) chunks = x : go initialDecoder (trailing : chunks) | |
go (B.Fail _ _ msg) _ = throw ioerr | |
where ioerr = mkIOError userErrorType msg Nothing (Just file) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment