public
Created

threaded version of process conduit

  • Download Gist
GBProcess.hs
Haskell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
{-# OPTIONS -Wall #-}
 
import Control.Monad (when,forever)
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import qualified Data.ByteString as B
import Data.ByteString (ByteString)
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import System.IO
import System.Process
import Control.Concurrent.MVar
import Control.Concurrent
 
bufSize :: Int
bufSize = 64*1024
 
drain :: MVar a -> IO b
drain v = forever $ takeMVar v >>= \_ -> return ()
 
pipeProcess :: MonadResource m => CreateProcess -> Pipe ByteString ByteString m ()
pipeProcess cp = flip PipeM (return ()) $ do
v3 <- liftIO newEmptyMVar
(_, (_,(Just cin, Just cout, _, ph))) <- allocate (createp v3) closep
v1 <- liftIO newEmptyMVar
h1 <- liftIO $ forkIO $ forever' $ do
inp' <- takeMVar v1
case inp' of
Just inp -> do
B.hPut cin inp >> hFlush cin
return True
Nothing -> do
hClose cin
_ <- drain v1
return False
v2 <- liftIO newEmptyMVar
h2 <- liftIO $ forkIO $ forever' $ do
zz <- tryTakeMVar v3
case zz of
Just zs -> do
hClose cout
putMVar v2 Nothing
return False
Nothing -> do
closed <- hIsClosed cout
if closed then do
putMVar v2 Nothing
return False
else do
eof <- hIsEOF cout
if eof then do
putMVar v2 Nothing
return False
else do
xs <- B.hGetSome cout bufSize
putMVar v2 $ Just xs
return True
_ <- register $ closet (h1,h2)
return ()
 
return $ go (v1,v2) cin cout ph
where
createp v3 = do
r <- createProcess cp
{ std_in = CreatePipe
, std_out = CreatePipe
}
return (v3,r)
 
 
closep (v3,(Just cin, Just cout, _, ph)) = do
hClose cin
putMVar v3 Nothing
-- threadDelay 2000
--hClose cout
_ <- waitForProcess ph
return ()
closet (h1,h2) = do
killThread h1
killThread h2
 
 
go (v1,v2) cin cout ph = do
xs <- liftIO $ tryTakeMVar v2
-- liftIO $ hPutStrLn stderr $ show xs
case xs of
Just Nothing ->
Done Nothing ()
 
Just (Just out) -> do
-- liftIO $ hPutStrLn stderr $ "HaveOutput: out[" ++ show out ++ "]"
HaveOutput (go (v1,v2) cin cout ph) (return ()) out
Nothing ->
NeedInput
(\inp -> do
liftIO $ putMVar v1 (Just inp)
go (v1,v2) cin cout ph)
(do
liftIO $ putMVar v1 Nothing
go (v1,v2) cin cout ph)
 
forever' :: (Monad m) => m Bool -> m ()
forever' ma = ma >>= \a -> when a $ forever' ma
 
-- | Source of process
sourceProcess :: (MonadIO m,MonadResource m) => CreateProcess -> Source m B.ByteString
sourceProcess cp = CL.sourceNull $= conduitProcess cp
 
-- | Conduit of process
conduitProcess :: (MonadIO m,MonadResource m) => CreateProcess -> Conduit B.ByteString m B.ByteString
conduitProcess = pipeProcess
 
-- | Pipe of shell command
pipeCmd :: (MonadIO m,MonadResource m) => String -> Pipe B.ByteString B.ByteString m ()
pipeCmd = pipeProcess . shell
 
-- | Source of shell command
sourceCmd :: (MonadIO m,MonadResource m) => String -> Source m B.ByteString
sourceCmd = sourceProcess . shell
 
-- | Conduit of shell command
conduitCmd :: (MonadIO m,MonadResource m) => String -> Conduit B.ByteString m B.ByteString
conduitCmd = conduitProcess . shell
 
main::IO ()
main = runResourceT $ sourceCmd "dir *" $$ CB.isolate 999 =$ conduitCmd "wc" =$ CB.sinkHandle stdout

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.