Last active
August 29, 2015 14:01
-
-
Save KholdStare/0faff6b2bd6ff01db776 to your computer and use it in GitHub Desktop.
Conduit Process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE FlexibleContexts, RankNTypes #-} | |
module Channel ( | |
-- * Fusion across threads | |
Channel(..), | |
ChannelSource, | |
($$>), | |
(<$$), | |
) where | |
import Control.Monad.Trans.Resource (MonadBaseControl) | |
import Control.Monad.IO.Class (MonadIO) | |
import Data.Acquire (with, mkAcquire) | |
import Control.Concurrent.Async.Lifted | |
import Data.Conduit (Source, Sink, ($$)) | |
-- | Abstracts a channel for joining conduits across threads | |
data Channel h i m o = Channel { | |
_sink :: h -> Sink i m (), | |
_source :: h -> Source m o, | |
_open :: IO h, | |
_closeIn :: h -> IO (), | |
_closeOut :: h -> IO () | |
} | |
-- | Source with associated channel | |
data ChannelSource h i m o = ChannelSource { | |
_start :: Source m i, | |
_channel :: Channel h i m o | |
} | |
-- | Associate a Channel with a Source | |
infixl 1 $$> | |
($$>) :: Source m i -> Channel h i m o -> ChannelSource h i m o | |
($$>) = ChannelSource | |
-- | Fuse a Source and a Sink through a Channel | |
infixr 0 <$$ | |
(<$$) :: (MonadIO m, MonadBaseControl IO m) | |
=> ChannelSource h i m o | |
-> Sink o m r | |
-> m r | |
chSource <$$ end = do | |
let openChannel = _open $ _channel chSource | |
let closeChannelIn = _closeIn $ _channel chSource | |
let closeChannelOut = _closeOut $ _channel chSource | |
with (mkAcquire openChannel closeChannelOut) $ \h -> do | |
let channelIn = (_sink $ _channel chSource) h | |
let channelOut = (_source $ _channel chSource) h | |
let start = _start chSource | |
withAsync (channelOut $$ end) $ \outResult -> do | |
(start $$ channelIn) `finally'` (closeChannelIn h) | |
wait outResult | |
where | |
finally' action finalizer = with (mkAcquire (return ()) (const finalizer)) $ const action |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Channel | |
import Process | |
import GHC.IO.Handle.FD (stdout) | |
import Data.Conduit.Binary | |
import Control.Monad.Trans.Resource | |
main :: IO () | |
main = runResourceT $ | |
sourceFile "/etc/dictionaries-common/words" | |
$$> (processChannel (shell "grep h -")) | |
<$$ sinkHandle stdout |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE FlexibleContexts, RankNTypes #-} | |
module Process ( | |
-- * Run process | |
ProcessChannelHandle, | |
processChannel, | |
-- * Convenience re-exports | |
shell, | |
proc, | |
CreateProcess(..), | |
CmdSpec(..), | |
StdStream(..), | |
) where | |
import qualified Control.Exception as E | |
import Control.Monad (void) | |
import Control.Monad.IO.Class (MonadIO) | |
import qualified Data.ByteString as S | |
import Data.Conduit (Sink, Source) | |
import Data.Conduit.Binary (sinkHandle, sourceHandle) | |
import System.Exit (ExitCode(..)) | |
import System.IO (Handle, hSetBuffering, BufferMode(..), hClose, hSetBinaryMode) | |
import System.Process ( | |
ProcessHandle, CreateProcess, createProcess, std_in, std_out, StdStream(..), | |
waitForProcess, shell, proc, CmdSpec(..) | |
) | |
import Channel (Channel(..)) | |
data ProcessData = ProcessData { | |
_processHandle :: ProcessHandle, | |
_cin :: Handle, | |
_cout :: Handle | |
} | |
type ProcessChannelHandle m = | |
(Sink S.ByteString m (), Source m S.ByteString, ProcessData) | |
processChannelPairOpen :: (MonadIO m) | |
=> CreateProcess | |
-> IO (ProcessChannelHandle m) | |
processChannelPairOpen cp = do | |
(Just cin, Just cout, _, ph) <- createProcess cp | |
{ std_in = CreatePipe | |
, std_out = CreatePipe | |
} | |
hSetBinaryMode cin True | |
hSetBinaryMode cout True | |
hSetBuffering cin NoBuffering | |
hSetBuffering cout NoBuffering | |
let sink = sinkHandle cin | |
source = sourceHandle cout | |
return (sink, source, ProcessData ph cin cout) | |
processChannelInClose :: (ProcessChannelHandle m) -> IO () | |
processChannelInClose (_, _, ProcessData _ cin _) = hClose cin | |
processChannelOutClose :: (ProcessChannelHandle m) -> IO () | |
processChannelOutClose (_, _, ProcessData ph _ cout) = do | |
hClose cout | |
void waitForProcess' | |
where | |
waitForProcess' = | |
waitForProcess ph `E.catch` \(E.SomeException _) -> return ExitSuccess | |
-- | Create a Channel that passes data through an OS process | |
processChannel :: (MonadIO m) | |
=> CreateProcess | |
-> Channel (ProcessChannelHandle m) S.ByteString m S.ByteString | |
processChannel cp = Channel | |
(\(i, _, _) -> i) | |
(\(_, o, _) -> o) | |
(processChannelPairOpen cp) | |
processChannelInClose | |
processChannelOutClose |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment