Skip to content

Instantly share code, notes, and snippets.

@shlevy
Created July 24, 2014 19:51
Show Gist options
  • Save shlevy/33e8483e247c99cd9f7f to your computer and use it in GitHub Desktop.
Save shlevy/33e8483e247c99cd9f7f to your computer and use it in GitHub Desktop.
cloneSource
{-# LANGUAGE FlexibleContexts, LambdaCase #-}
module Main where
import Data.ByteString (ByteString)
import Conduit.Simple
import Control.Concurrent.STM
import Control.Concurrent.STM.TMQueue
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.Lifted
import Control.Monad.Trans
import Control.Monad.Trans.Either
import Control.Monad.Trans.Control
import Control.Monad.Base
import Control.Concurrent.Async
import Control.Monad
import Control.Concurrent
sinkTBMQueue :: (MonadBaseControl IO m)
=> Int
-> Sink a m (TBMQueue a)
sinkTBMQueue cap source = do
q <- liftBase $ newTBMQueueIO cap
_ <- fork $ sinkQueue q
return q
where
sinkQueue q = do
sink q f source
liftBase . atomically $ closeTBMQueue q
f q elem =
lift . liftBase . atomically $ writeTBMQueue q elem >> return q
sinkTMQueue :: (MonadBaseControl IO m)
=> Sink a m (TMQueue a)
sinkTMQueue source = do
q <- liftBase newTMQueueIO
fork $ sinkQueue q
return q
where
sinkQueue q = do
sink q f source
liftBase . atomically $ closeTMQueue q
f q elem =
lift . liftBase . atomically $ writeTMQueue q elem >> return q
-- "clone" can be arbitrary m so we can't live in STM here
fanoutTMQueue :: (MonadBaseControl IO m)
=> TMQueue a
-> (a -> m a)
-> m (TMQueue a, TMQueue a)
fanoutTMQueue q clone = do
res@(q1, q2) <- liftBase createQueues
fork $ fanout q1 q2
return res
where
createQueues = do
q1 <- newTMQueueIO
q2 <- newTMQueueIO
return (q1, q2)
fanout q1 q2 = liftBase (atomically (readTMQueue q)) >>= \case
Just x1 -> clone x1 >>= (\x2 -> liftBase $ do
a1 <- async . atomically $ writeTMQueue q1 x1
a2 <- async . atomically $ writeTMQueue q2 x2
wait a1
wait a2) >> fanout q1 q2
Nothing -> liftBase $ do
forkIO . atomically $ closeTMQueue q1
atomically $ closeTMQueue q2
cloneSource :: (MonadBaseControl IO m) => Source m a -> (a -> m a) -> m (Source m a, Source m a)
cloneSource src clone = do
q <- sinkTMQueue src
(q1, q2) <- fanoutTMQueue q clone
return (sourceQueue q1, sourceQueue q2)
where
sourceQueue q = source (go q)
go q z yield = lift (liftBase (atomically (readTMQueue q))) >>= \case
Just x -> do
next <- yield z x
go q next yield
Nothing -> return z
sauce = sourceFile "Test.hs" :: Source IO ByteString
sink1 = sinkFile "out1"
sink2 = sinkFile "out2"
main :: IO ()
main = do
(s1, s2) <- cloneSource sauce return
sink1 s1
sink2 s2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment