Created
April 15, 2012 23:42
-
-
Save thelff/2395361 to your computer and use it in GitHub Desktop.
threaded version of process conduit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# OPTIONS -Wall #-} | |
import Control.Monad (when,forever) | |
import Control.Monad.Trans | |
import Control.Monad.Trans.Resource | |
import qualified Data.ByteString as B | |
import Data.ByteString (ByteString) | |
import Data.Conduit | |
import qualified Data.Conduit.List as CL | |
import qualified Data.Conduit.Binary as CB | |
import System.IO | |
import System.Process | |
import Control.Concurrent.MVar | |
import Control.Concurrent | |
bufSize :: Int | |
bufSize = 64*1024 | |
drain :: MVar a -> IO b | |
drain v = forever $ takeMVar v >>= \_ -> return () | |
pipeProcess :: MonadResource m => CreateProcess -> Pipe ByteString ByteString m () | |
pipeProcess cp = flip PipeM (return ()) $ do | |
v3 <- liftIO newEmptyMVar | |
(_, (_,(Just cin, Just cout, _, ph))) <- allocate (createp v3) closep | |
v1 <- liftIO newEmptyMVar | |
h1 <- liftIO $ forkIO $ forever' $ do | |
inp' <- takeMVar v1 | |
case inp' of | |
Just inp -> do | |
B.hPut cin inp >> hFlush cin | |
return True | |
Nothing -> do | |
hClose cin | |
_ <- drain v1 | |
return False | |
v2 <- liftIO newEmptyMVar | |
h2 <- liftIO $ forkIO $ forever' $ do | |
zz <- tryTakeMVar v3 | |
case zz of | |
Just zs -> do | |
hClose cout | |
putMVar v2 Nothing | |
return False | |
Nothing -> do | |
closed <- hIsClosed cout | |
if closed then do | |
putMVar v2 Nothing | |
return False | |
else do | |
eof <- hIsEOF cout | |
if eof then do | |
putMVar v2 Nothing | |
return False | |
else do | |
xs <- B.hGetSome cout bufSize | |
putMVar v2 $ Just xs | |
return True | |
_ <- register $ closet (h1,h2) | |
return () | |
return $ go (v1,v2) cin cout ph | |
where | |
createp v3 = do | |
r <- createProcess cp | |
{ std_in = CreatePipe | |
, std_out = CreatePipe | |
} | |
return (v3,r) | |
closep (v3,(Just cin, Just cout, _, ph)) = do | |
hClose cin | |
putMVar v3 Nothing | |
-- threadDelay 2000 | |
--hClose cout | |
_ <- waitForProcess ph | |
return () | |
closet (h1,h2) = do | |
killThread h1 | |
killThread h2 | |
go (v1,v2) cin cout ph = do | |
xs <- liftIO $ tryTakeMVar v2 | |
-- liftIO $ hPutStrLn stderr $ show xs | |
case xs of | |
Just Nothing -> | |
Done Nothing () | |
Just (Just out) -> do | |
-- liftIO $ hPutStrLn stderr $ "HaveOutput: out[" ++ show out ++ "]" | |
HaveOutput (go (v1,v2) cin cout ph) (return ()) out | |
Nothing -> | |
NeedInput | |
(\inp -> do | |
liftIO $ putMVar v1 (Just inp) | |
go (v1,v2) cin cout ph) | |
(do | |
liftIO $ putMVar v1 Nothing | |
go (v1,v2) cin cout ph) | |
forever' :: (Monad m) => m Bool -> m () | |
forever' ma = ma >>= \a -> when a $ forever' ma | |
-- | Source of process | |
sourceProcess :: (MonadIO m,MonadResource m) => CreateProcess -> Source m B.ByteString | |
sourceProcess cp = CL.sourceNull $= conduitProcess cp | |
-- | Conduit of process | |
conduitProcess :: (MonadIO m,MonadResource m) => CreateProcess -> Conduit B.ByteString m B.ByteString | |
conduitProcess = pipeProcess | |
-- | Pipe of shell command | |
pipeCmd :: (MonadIO m,MonadResource m) => String -> Pipe B.ByteString B.ByteString m () | |
pipeCmd = pipeProcess . shell | |
-- | Source of shell command | |
sourceCmd :: (MonadIO m,MonadResource m) => String -> Source m B.ByteString | |
sourceCmd = sourceProcess . shell | |
-- | Conduit of shell command | |
conduitCmd :: (MonadIO m,MonadResource m) => String -> Conduit B.ByteString m B.ByteString | |
conduitCmd = conduitProcess . shell | |
main::IO () | |
main = runResourceT $ sourceCmd "dir *" $$ CB.isolate 999 =$ conduitCmd "wc" =$ CB.sinkHandle stdout |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment