Skip to content

Instantly share code, notes, and snippets.

@KholdStare
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KholdStare/0faff6b2bd6ff01db776 to your computer and use it in GitHub Desktop.
Save KholdStare/0faff6b2bd6ff01db776 to your computer and use it in GitHub Desktop.
Conduit Process
{-# LANGUAGE FlexibleContexts, RankNTypes #-}
module Channel (
-- * Fusion across threads
Channel(..),
ChannelSource,
($$>),
(<$$),
) where
import Control.Monad.Trans.Resource (MonadBaseControl)
import Control.Monad.IO.Class (MonadIO)
import Data.Acquire (with, mkAcquire)
import Control.Concurrent.Async.Lifted
import Data.Conduit (Source, Sink, ($$))
-- | Abstracts a channel for joining conduits across threads
data Channel h i m o = Channel {
_sink :: h -> Sink i m (),
_source :: h -> Source m o,
_open :: IO h,
_closeIn :: h -> IO (),
_closeOut :: h -> IO ()
}
-- | Source with associated channel
data ChannelSource h i m o = ChannelSource {
_start :: Source m i,
_channel :: Channel h i m o
}
-- | Associate a Channel with a Source
infixl 1 $$>
($$>) :: Source m i -> Channel h i m o -> ChannelSource h i m o
($$>) = ChannelSource
-- | Fuse a Source and a Sink through a Channel
infixr 0 <$$
(<$$) :: (MonadIO m, MonadBaseControl IO m)
=> ChannelSource h i m o
-> Sink o m r
-> m r
chSource <$$ end = do
let openChannel = _open $ _channel chSource
let closeChannelIn = _closeIn $ _channel chSource
let closeChannelOut = _closeOut $ _channel chSource
with (mkAcquire openChannel closeChannelOut) $ \h -> do
let channelIn = (_sink $ _channel chSource) h
let channelOut = (_source $ _channel chSource) h
let start = _start chSource
withAsync (channelOut $$ end) $ \outResult -> do
(start $$ channelIn) `finally'` (closeChannelIn h)
wait outResult
where
finally' action finalizer = with (mkAcquire (return ()) (const finalizer)) $ const action
import Channel
import Process
import GHC.IO.Handle.FD (stdout)
import Data.Conduit.Binary
import Control.Monad.Trans.Resource
main :: IO ()
main = runResourceT $
sourceFile "/etc/dictionaries-common/words"
$$> (processChannel (shell "grep h -"))
<$$ sinkHandle stdout
{-# LANGUAGE FlexibleContexts, RankNTypes #-}
module Process (
-- * Run process
ProcessChannelHandle,
processChannel,
-- * Convenience re-exports
shell,
proc,
CreateProcess(..),
CmdSpec(..),
StdStream(..),
) where
import qualified Control.Exception as E
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO)
import qualified Data.ByteString as S
import Data.Conduit (Sink, Source)
import Data.Conduit.Binary (sinkHandle, sourceHandle)
import System.Exit (ExitCode(..))
import System.IO (Handle, hSetBuffering, BufferMode(..), hClose, hSetBinaryMode)
import System.Process (
ProcessHandle, CreateProcess, createProcess, std_in, std_out, StdStream(..),
waitForProcess, shell, proc, CmdSpec(..)
)
import Channel (Channel(..))
data ProcessData = ProcessData {
_processHandle :: ProcessHandle,
_cin :: Handle,
_cout :: Handle
}
type ProcessChannelHandle m =
(Sink S.ByteString m (), Source m S.ByteString, ProcessData)
processChannelPairOpen :: (MonadIO m)
=> CreateProcess
-> IO (ProcessChannelHandle m)
processChannelPairOpen cp = do
(Just cin, Just cout, _, ph) <- createProcess cp
{ std_in = CreatePipe
, std_out = CreatePipe
}
hSetBinaryMode cin True
hSetBinaryMode cout True
hSetBuffering cin NoBuffering
hSetBuffering cout NoBuffering
let sink = sinkHandle cin
source = sourceHandle cout
return (sink, source, ProcessData ph cin cout)
processChannelInClose :: (ProcessChannelHandle m) -> IO ()
processChannelInClose (_, _, ProcessData _ cin _) = hClose cin
processChannelOutClose :: (ProcessChannelHandle m) -> IO ()
processChannelOutClose (_, _, ProcessData ph _ cout) = do
hClose cout
void waitForProcess'
where
waitForProcess' =
waitForProcess ph `E.catch` \(E.SomeException _) -> return ExitSuccess
-- | Create a Channel that passes data through an OS process
processChannel :: (MonadIO m)
=> CreateProcess
-> Channel (ProcessChannelHandle m) S.ByteString m S.ByteString
processChannel cp = Channel
(\(i, _, _) -> i)
(\(_, o, _) -> o)
(processChannelPairOpen cp)
processChannelInClose
processChannelOutClose
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment