Skip to content

Instantly share code, notes, and snippets.

@snoyberg
Created June 21, 2018 13:58
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save snoyberg/7e5dd52109b03c8bf1aa8fe1a7e522b9 to your computer and use it in GitHub Desktop.
Save snoyberg/7e5dd52109b03c8bf1aa8fe1a7e522b9 to your computer and use it in GitHub Desktop.
Ensure that a conduit produces output every X microseconds
#!/usr/bin/env stack
-- stack --resolver lts-11.10 script
{-# LANGUAGE NoImplicitPrelude #-}
import RIO
import Conduit
import Control.Concurrent.STM.TBMQueue
doesn'tStall
:: MonadUnliftIO m
=> Int -- ^ number of microseconds
-> ConduitT () o m () -- ^ original source
-> (ConduitT () o m () -> m a) -- ^ what to do with modified source
-> m a
doesn'tStall usec src inner = do
queue <- liftIO $ newTBMQueueIO 2
runConcurrently $
Concurrently (filler queue) *>
Concurrently (inner $ consumer queue)
where
filler queue =
runConduit (src .| mapM_C (atomically . writeTBMQueue queue))
`finally` atomically (closeTBMQueue queue)
consumer queue =
loop
where
loop = do
res <- lift $ timeout usec $ atomically $ readTBMQueue queue
case res of
-- timeout occurred
Nothing -> error "too slow Joe!" -- better exception type
-- queue is closed
Just Nothing -> pure ()
-- more data available
Just (Just o) -> yield o >> loop
slowSource :: ConduitT () Int IO ()
slowSource = forM_ [1..] $ \i -> do
yield i
threadDelay $ i * 100000
main :: IO ()
main = doesn'tStall 1000000 slowSource $ \src ->
runConduit $ src .| printC
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment