Skip to content

Instantly share code, notes, and snippets.

@chrisdone
Last active February 6, 2020 11:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chrisdone/6a592fe82c07074e6a1c178dda33f153 to your computer and use it in GitHub Desktop.
Save chrisdone/6a592fe82c07074e6a1c178dda33f153 to your computer and use it in GitHub Desktop.
Flushing source conduit
{-# LANGUAGE LambdaCase #-}
-- | Sources that flush when their monadic action takes longer than n
-- microseconds.
module Data.Conduit.Flush where
import Data.Conduit
import Data.Conduit.Internal as Pipe (ConduitT(..), Pipe(..))
import UnliftIO
-- | Each monadic action in the given conduit will run in an async
-- thread, and the conduit will yield a 'Flush' if the action takes longer
-- than N microseconds.
flushSource ::
MonadUnliftIO m => Int -> ConduitT () o m r -> ConduitT () (Flush o) m r
flushSource delay (ConduitT c0) =
ConduitT $ \rest ->
let go =
\case
Leftover p l -> Leftover (go p) l
Done r -> rest r
HaveOutput p o -> HaveOutput (go p) (Chunk o)
NeedInput p c -> NeedInput (go . p) (go . c)
PipeM m ->
PipeM
(do original <- async m
result <- timeout delay (wait original)
case result of
Nothing ->
pure
(HaveOutput (PipeM (fmap go (wait original))) Flush)
Just p -> pure (go p))
in go (c0 Done)
-- | Join nested flushes into one.
joinFlush :: Flush (Flush a) -> Flush a
joinFlush =
\case
Flush -> Flush
Chunk (Chunk a) -> Chunk a
Chunk Flush -> Flush
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment