Skip to content

Instantly share code, notes, and snippets.

@quchen
Last active December 27, 2015 17:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quchen/7362643 to your computer and use it in GitHub Desktop.
Save quchen/7362643 to your computer and use it in GitHub Desktop.
This is a mock-up of an extended "spawn" method for Pipes.Concurrent that keeps track of metadata of the buffers. It should be fully compatible with the existing API.
-- | Store the metadata of a buffer.
data BufferMeta = BufferMeta { currentSize :: Int
, maxSize :: Maybe Int
}
-- | Like spawn', but also keeps track of metadata associated with the buffer.
spawn'' :: Buffer a
-> IO (Output a, Input a, STM (), STM BufferMeta)
spawn'' buffer = do
(read, write, meta) <- case buffer of
-- Keep track of current size, and store the max size.
Bounded n -> do
q <- S.newTBQueueIO n
c <- newTVarIO (0 :: Int)
let read = modifyTVar' c (subtract 1) >> S.readTBQueue q
write = modifyTVar' c (+1) >> S.writeTBQueue q
size = readTVar c
maxSize = return (Just n)
meta = bufferMeta <$> readTVar c <*> pure (Just n)
return (read, write, meta)
-- Like Bounded, but without a max size.
Unbounded -> do
q <- S.newTQueueIO
c <- newTVarIO (0 :: Int)
let read = modifyTVar' c (subtract 1) >> S.readTQueue q
write = modifyTVar' c (+1) >> S.writeTQueue q
meta = BufferMeta <$> readTVar c <*> pure Nothing
return (read, write, meta)
-- A TMVar has a constant size of 1. To reduce the bookkeeping overhead
-- isEmptyTMVar is used, instead of updating the filling status each
-- time something is written/read.
Single -> do
m <- S.newEmptyTMVarIO
let read = S.takeTMVar m
write = S.putTMVar m
meta = BufferMeta
<$> (bool <$> isEmptyTMVar m)
<*> pure (Just 1)
bool p = if p then 0 else 1
return (S.takeTMVar m, S.putTMVar m, meta)
-- Latest has a max size of 1 and is always full.
Latest a -> do
t <- S.newTVarIO a
return (S.readTVar t, S.writeTVar t, return (BufferMeta 1 1))
-- (Below is the rest of the procedure to take care of GC etc.)
return (read, write, seal, meta)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment