Created
July 24, 2014 19:51
-
-
Save shlevy/33e8483e247c99cd9f7f to your computer and use it in GitHub Desktop.
cloneSource
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE FlexibleContexts, LambdaCase #-} | |
module Main where | |
import Data.ByteString (ByteString) | |
import Conduit.Simple | |
import Control.Concurrent.STM | |
import Control.Concurrent.STM.TMQueue | |
import Control.Concurrent.STM.TBMQueue | |
import Control.Concurrent.Lifted | |
import Control.Monad.Trans | |
import Control.Monad.Trans.Either | |
import Control.Monad.Trans.Control | |
import Control.Monad.Base | |
import Control.Concurrent.Async | |
import Control.Monad | |
import Control.Concurrent | |
sinkTBMQueue :: (MonadBaseControl IO m) | |
=> Int | |
-> Sink a m (TBMQueue a) | |
sinkTBMQueue cap source = do | |
q <- liftBase $ newTBMQueueIO cap | |
_ <- fork $ sinkQueue q | |
return q | |
where | |
sinkQueue q = do | |
sink q f source | |
liftBase . atomically $ closeTBMQueue q | |
f q elem = | |
lift . liftBase . atomically $ writeTBMQueue q elem >> return q | |
sinkTMQueue :: (MonadBaseControl IO m) | |
=> Sink a m (TMQueue a) | |
sinkTMQueue source = do | |
q <- liftBase newTMQueueIO | |
fork $ sinkQueue q | |
return q | |
where | |
sinkQueue q = do | |
sink q f source | |
liftBase . atomically $ closeTMQueue q | |
f q elem = | |
lift . liftBase . atomically $ writeTMQueue q elem >> return q | |
-- "clone" can be arbitrary m so we can't live in STM here | |
fanoutTMQueue :: (MonadBaseControl IO m) | |
=> TMQueue a | |
-> (a -> m a) | |
-> m (TMQueue a, TMQueue a) | |
fanoutTMQueue q clone = do | |
res@(q1, q2) <- liftBase createQueues | |
fork $ fanout q1 q2 | |
return res | |
where | |
createQueues = do | |
q1 <- newTMQueueIO | |
q2 <- newTMQueueIO | |
return (q1, q2) | |
fanout q1 q2 = liftBase (atomically (readTMQueue q)) >>= \case | |
Just x1 -> clone x1 >>= (\x2 -> liftBase $ do | |
a1 <- async . atomically $ writeTMQueue q1 x1 | |
a2 <- async . atomically $ writeTMQueue q2 x2 | |
wait a1 | |
wait a2) >> fanout q1 q2 | |
Nothing -> liftBase $ do | |
forkIO . atomically $ closeTMQueue q1 | |
atomically $ closeTMQueue q2 | |
cloneSource :: (MonadBaseControl IO m) => Source m a -> (a -> m a) -> m (Source m a, Source m a) | |
cloneSource src clone = do | |
q <- sinkTMQueue src | |
(q1, q2) <- fanoutTMQueue q clone | |
return (sourceQueue q1, sourceQueue q2) | |
where | |
sourceQueue q = source (go q) | |
go q z yield = lift (liftBase (atomically (readTMQueue q))) >>= \case | |
Just x -> do | |
next <- yield z x | |
go q next yield | |
Nothing -> return z | |
sauce = sourceFile "Test.hs" :: Source IO ByteString | |
sink1 = sinkFile "out1" | |
sink2 = sinkFile "out2" | |
main :: IO () | |
main = do | |
(s1, s2) <- cloneSource sauce return | |
sink1 s1 | |
sink2 s2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment