Skip to content

Instantly share code, notes, and snippets.

@Akii
Created March 17, 2019 18:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Akii/eef3b75258c1c721d22260b171075210 to your computer and use it in GitHub Desktop.
Save Akii/eef3b75258c1c721d22260b171075210 to your computer and use it in GitHub Desktop.
chunkChan :: Int -> Int -> TChan a -> IO [a]
chunkChan numItems timeoutSeconds chan = do
maybeRead <- timeout (timeoutSeconds * 60 * 1000) . atomically $ replicateM numItems (readTChan chan)
case maybeRead of
Just as -> return as
Nothing -> do
as <- emptyChannel chan
if null as
then chunkChan numItems timeoutSeconds chan
else return as
emptyChannel :: TChan a -> IO [a]
emptyChannel = undefined
@dmwit
Copy link

dmwit commented Mar 17, 2019

chunkChan numItems timeoutSeconds chan = do
    a <- atomically $ readTChan chan
    mas <- timeout (timeoutSeconds * 1000000) . atomically . replicateM (numItems-1) $ readTChan chan
    (a:) <$> case mas of
        Nothing -> atomically . unfoldM $ tryReadTChan chan
        Just as -> pure as

@Akii
Copy link
Author

Akii commented Mar 17, 2019

A version that does not explode STM transactions:

chunkChan :: Int -> Int -> TChan a -> IO [a]
chunkChan numItems timeoutSeconds chan = do
  a <- atomically $ readTChan chan
  computeRef <- newIORef [a]

  let takeOne = atomically (readTChan chan)
      appendOne = takeOne >>= modifyIORef' computeRef . (:)
      appendN = replicateM_ (numItems - 1) appendOne

  _ <- timeout (timeoutSeconds * 1000000) appendN
  reverse <$> readIORef computeRef

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment