Skip to content

Instantly share code, notes, and snippets.

@robinp
Created March 31, 2019 09:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robinp/fa14b51425f010454a5ea5b1c9f220ce to your computer and use it in GitHub Desktop.
Save robinp/fa14b51425f010454a5ea5b1c9f220ce to your computer and use it in GitHub Desktop.
LevelDB Stream merging and grouping
{- Note: originally for treetide.com -}
{- Releasing under Apache 2.0 licence (https://www.apache.org/licenses/LICENSE-2.0), feel free to reuse accordingly. -}
{-# LANGUAGE TupleSections #-}
module Database.LevelDB.Streaming.Extended
( module X
, mergeStreams
, groupStream
)
where
import Control.Monad.IO.Class ( MonadIO )
import Database.LevelDB.Streaming as X
-- | Merge two streams according to the comparison function over the entries.
mergeStreams
:: (MonadIO m)
=> (a -> a -> Ordering)
-> Stream m a
-> Stream m a
-> Stream m a
mergeStreams cmp (Stream nextA sa0) (Stream nextB sb0) = Stream
next
((,,) Nothing <$> sa0 <*> sb0)
where
-- Implementation note: by adding more info to the internal step type,
-- we might some repeated computations (see LevelS source for inspiration).
-- Keeping it dumb for now.
--
-- Also: awfully copy-pasted code.
next (w, sa, sb) = case w of
Nothing -> do
stepA <- nextA sa
case stepA of
Done -> do
stepB <- nextB sb
return $! case stepB of
Done -> Done
Skip sb' -> Skip (w, sa, sb')
Yield b sb' -> Yield b (w, sa, sb')
Skip sa' -> return $! Skip (w, sa', sb)
Yield a sa' -> do
stepB <- nextB sb
return $! case stepB of
Done -> Yield a (w, sa', sb)
Skip sb' -> Skip (Just (Left a), sa', sb')
Yield b sb' -> case cmp a b of
LT -> Yield a (Just (Right b), sa', sb')
EQ -> Yield a (Just (Right b), sa', sb')
GT -> Yield b (Just (Left a), sa', sb')
Just (Left a) -> do
stepB <- nextB sb
return $! case stepB of
Done -> Yield a (Nothing, sa, sb)
Skip sb' -> Skip (w, sa, sb')
Yield b sb' -> case cmp a b of
LT -> Yield a (Just (Right b), sa, sb')
EQ -> Yield a (Just (Right b), sa, sb')
GT -> Yield b (w, sa, sb')
Just (Right b) -> do
stepA <- nextA sa
return $! case stepA of
Done -> Yield b (Nothing, sa, sb)
Skip sa' -> Skip (w, sa', sb)
Yield a sa' -> case cmp a b of
LT -> Yield a (w, sa', sb)
EQ -> Yield a (w, sa', sb)
GT -> Yield b (Just (Left a), sa', sb)
-- | Similar to 'groupBy' but for 'Stream'. Buffers consecutive entries while
-- the equality function returns true, and yields them together.
groupStream :: (MonadIO m) => (a -> a -> Bool) -> Stream m a -> Stream m [a]
groupStream f (Stream next0 s) = Stream next (([], ) <$> s)
where
next (accum, s0) = do
step <- next0 s0
return $! case step of
Done -> case accum of
[] -> Done
-- It seems 's0' can be re-queried without horrible segfaults,
-- so not introducing 'Maybe' to signal end-of-upstream.
_ -> Yield accum ([], s0)
Skip s0' -> Skip (accum, s0')
Yield a s0' -> case accum of
[] -> Skip ([a], s0')
(x : _) -> if f x a
then Skip (a : accum, s0')
else Yield accum ([a], s0')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment