Skip to content

@thelff /GBProcess.hs
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
threaded version of process conduit
{-# OPTIONS -Wall #-}
import Control.Monad (when,forever)
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import qualified Data.ByteString as B
import Data.ByteString (ByteString)
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import System.IO
import System.Process
import Control.Concurrent.MVar
import Control.Concurrent
bufSize :: Int
bufSize = 64*1024
drain :: MVar a -> IO b
drain v = forever $ takeMVar v >>= \_ -> return ()
pipeProcess :: MonadResource m => CreateProcess -> Pipe ByteString ByteString m ()
pipeProcess cp = flip PipeM (return ()) $ do
v3 <- liftIO newEmptyMVar
(_, (_,(Just cin, Just cout, _, ph))) <- allocate (createp v3) closep
v1 <- liftIO newEmptyMVar
h1 <- liftIO $ forkIO $ forever' $ do
inp' <- takeMVar v1
case inp' of
Just inp -> do
B.hPut cin inp >> hFlush cin
return True
Nothing -> do
hClose cin
_ <- drain v1
return False
v2 <- liftIO newEmptyMVar
h2 <- liftIO $ forkIO $ forever' $ do
zz <- tryTakeMVar v3
case zz of
Just zs -> do
hClose cout
putMVar v2 Nothing
return False
Nothing -> do
closed <- hIsClosed cout
if closed then do
putMVar v2 Nothing
return False
else do
eof <- hIsEOF cout
if eof then do
putMVar v2 Nothing
return False
else do
xs <- B.hGetSome cout bufSize
putMVar v2 $ Just xs
return True
_ <- register $ closet (h1,h2)
return ()
return $ go (v1,v2) cin cout ph
where
createp v3 = do
r <- createProcess cp
{ std_in = CreatePipe
, std_out = CreatePipe
}
return (v3,r)
closep (v3,(Just cin, Just cout, _, ph)) = do
hClose cin
putMVar v3 Nothing
-- threadDelay 2000
--hClose cout
_ <- waitForProcess ph
return ()
closet (h1,h2) = do
killThread h1
killThread h2
go (v1,v2) cin cout ph = do
xs <- liftIO $ tryTakeMVar v2
-- liftIO $ hPutStrLn stderr $ show xs
case xs of
Just Nothing ->
Done Nothing ()
Just (Just out) -> do
-- liftIO $ hPutStrLn stderr $ "HaveOutput: out[" ++ show out ++ "]"
HaveOutput (go (v1,v2) cin cout ph) (return ()) out
Nothing ->
NeedInput
(\inp -> do
liftIO $ putMVar v1 (Just inp)
go (v1,v2) cin cout ph)
(do
liftIO $ putMVar v1 Nothing
go (v1,v2) cin cout ph)
forever' :: (Monad m) => m Bool -> m ()
forever' ma = ma >>= \a -> when a $ forever' ma
-- | Source of process
sourceProcess :: (MonadIO m,MonadResource m) => CreateProcess -> Source m B.ByteString
sourceProcess cp = CL.sourceNull $= conduitProcess cp
-- | Conduit of process
conduitProcess :: (MonadIO m,MonadResource m) => CreateProcess -> Conduit B.ByteString m B.ByteString
conduitProcess = pipeProcess
-- | Pipe of shell command
pipeCmd :: (MonadIO m,MonadResource m) => String -> Pipe B.ByteString B.ByteString m ()
pipeCmd = pipeProcess . shell
-- | Source of shell command
sourceCmd :: (MonadIO m,MonadResource m) => String -> Source m B.ByteString
sourceCmd = sourceProcess . shell
-- | Conduit of shell command
conduitCmd :: (MonadIO m,MonadResource m) => String -> Conduit B.ByteString m B.ByteString
conduitCmd = conduitProcess . shell
main::IO ()
main = runResourceT $ sourceCmd "dir *" $$ CB.isolate 999 =$ conduitCmd "wc" =$ CB.sinkHandle stdout
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.