Skip to content

Instantly share code, notes, and snippets.

@Akii
Created March 17, 2019 18:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Akii/eef3b75258c1c721d22260b171075210 to your computer and use it in GitHub Desktop.
Save Akii/eef3b75258c1c721d22260b171075210 to your computer and use it in GitHub Desktop.
chunkChan :: Int -> Int -> TChan a -> IO [a]
chunkChan numItems timeoutSeconds chan = do
maybeRead <- timeout (timeoutSeconds * 60 * 1000) . atomically $ replicateM numItems (readTChan chan)
case maybeRead of
Just as -> return as
Nothing -> do
as <- emptyChannel chan
if null as
then chunkChan numItems timeoutSeconds chan
else return as
emptyChannel :: TChan a -> IO [a]
emptyChannel = undefined
@Akii
Copy link
Author

Akii commented Mar 17, 2019

A version that does not explode STM transactions:

chunkChan :: Int -> Int -> TChan a -> IO [a]
chunkChan numItems timeoutSeconds chan = do
  a <- atomically $ readTChan chan
  computeRef <- newIORef [a]

  let takeOne = atomically (readTChan chan)
      appendOne = takeOne >>= modifyIORef' computeRef . (:)
      appendN = replicateM_ (numItems - 1) appendOne

  _ <- timeout (timeoutSeconds * 1000000) appendN
  reverse <$> readIORef computeRef

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment