Last active
August 29, 2015 14:11
-
-
Save bitonic/a48518b54615d17daad4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# OPTIONS_GHC -Wall -fno-warn-name-shadowing #-} | |
{-# LANGUAGE RankNTypes #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE GADTs #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
-- | Run starting from "begin" and ending with "end", e.g. | |
-- | |
-- @ | |
-- ./MapReduce begin 10000 & | |
-- ./MapReduce segment 0 10001 '127.0.0.1' 10000 & | |
-- ./MapReduce segment 1 10002 '127.0.0.1' 10001 & | |
-- ./MapReduce end '127.0.0.1' 10002 & | |
module Main where | |
import Control.Monad | |
import qualified Data.ByteString as B | |
import Data.Serialize | |
import Pipes | |
import Pipes.Network.TCP | |
import System.Environment | |
data Pipeline_ a m where | |
PLEnd :: Consumer a m () -> Pipeline_ a m | |
PLCons :: (Serialize b) => Pipe a b m () -> Pipeline_ b m -> Pipeline_ a m | |
data Pipeline m = | |
forall a. (Serialize a) => Pipeline (Producer a m ()) (Pipeline_ a m) | |
infixr >--> | |
(>-->) :: (Serialize b) => Pipe a b m () -> Pipeline_ b m -> Pipeline_ a m | |
(>-->) = PLCons | |
infixr |--> | |
(|-->) :: (Serialize a) => Producer a m () -> Pipeline_ a m -> Pipeline m | |
(|-->) = Pipeline | |
infixr >--| | |
(>--|) :: (Serialize b) => Pipe a b m () -> Consumer b m () -> Pipeline_ a m | |
p1 >--| p2 = PLCons p1 (PLEnd p2) | |
getSegment | |
:: forall m x. | |
Pipeline m -> Int | |
-> (forall a b. (Serialize a, Serialize b) => Pipe a b m () -> x) | |
-> x | |
getSegment (Pipeline _ pl) n cont = | |
if n < 0 | |
then error "getSegment: negative index" | |
else go n pl | |
where | |
go :: (Serialize b) => Int -> Pipeline_ b m -> x | |
go 0 (PLCons p _) = cont p | |
go n (PLCons _ pl) = go (n-1) pl | |
go _ _ = error "getSegment: out of bounds" | |
runSegment | |
:: (Serialize a, Serialize b, MonadIO m) => Socket -> Socket -> Pipe a b m () -> m () | |
runSegment sckL sckR p = | |
runEffect $ fromSocket sckL 4096 >-> pipeDecode >-> p >-> pipeEncode >-> toSocket sckR | |
getBegin | |
:: Pipeline m | |
-> (forall a. (Serialize a) => Producer a m () -> x) | |
-> x | |
getBegin (Pipeline p _) cont = cont p | |
runBegin | |
:: (Serialize a, MonadIO m) => Socket -> Producer a m () -> m () | |
runBegin sck p = | |
runEffect $ p >-> pipeEncode >-> toSocket sck | |
getEnd | |
:: forall m x. | |
Pipeline m | |
-> (forall a. (Serialize a) => Consumer a m () -> x) | |
-> x | |
getEnd (Pipeline _ pl) cont = go pl | |
where | |
go :: (Serialize b) => Pipeline_ b m -> x | |
go (PLEnd p) = cont p | |
go (PLCons _ pl) = go pl | |
runEnd | |
:: (Serialize a, MonadIO m) => Socket -> Consumer a m () -> m () | |
runEnd sck p = | |
runEffect $ fromSocket sck 4096 >-> pipeDecode >-> p | |
runPipeline :: forall m. (Monad m) => Pipeline m -> Effect m () | |
runPipeline (Pipeline p1 p2) = p1 >-> go p2 | |
where | |
go :: Pipeline_ a m -> Consumer a m () | |
go (PLEnd p) = p | |
go (PLCons p pl) = p >-> go pl | |
-- | |
testPipeline :: Pipeline IO | |
testPipeline = | |
mapM_ yield [10::Int,20,3,4] |--> | |
showNumber >--> reverseString >--| | |
-- pipeEncode >--> pipeDecode >--| | |
printNumber | |
where | |
showNumber :: (Monad m) => Pipe Int String m a | |
showNumber = forever $ do | |
n <- await | |
yield $ show n | |
reverseString :: (Monad m) => Pipe String String m a | |
reverseString = forever $ do | |
s <- await | |
yield $ reverse s | |
printNumber :: Consumer String IO a | |
printNumber = forever $ do | |
n <- await | |
lift $ putStrLn n | |
main :: IO () | |
main = do | |
(type_ : args) <- getArgs | |
case type_ of | |
"end" -> do | |
let [hostName, port] = args | |
myConnect hostName port $ \sck -> getEnd pipeline $ runEnd sck | |
"begin" -> do | |
let [port] = args | |
myServe port $ \sck -> getBegin pipeline $ runBegin sck | |
"segment" -> do | |
-- We're in the middle | |
[ix, myPort, theirHostName, theirPort] <- return args | |
ix <- return $ read ix | |
myServe myPort $ \sckR -> do | |
myConnect theirHostName theirPort $ \sckL -> do | |
getSegment pipeline ix $ runSegment sckL sckR | |
_ -> do | |
error "bad usage" | |
where | |
pipeline = testPipeline | |
myServe port cont = serve (Host "127.0.0.1") port $ \(sck, addr) -> do | |
putStrLn $ "TCP connection established from " ++ show addr | |
cont sck | |
myConnect hostName port cont = connect hostName port $ \(sck, addr) -> do | |
putStrLn $ "Connection established to " ++ show addr | |
cont sck | |
-- Encode/decode | |
------------------------------------------------------------------------ | |
pipeDecode :: forall a m r. (Serialize a, Monad m) => Pipe B.ByteString a m r | |
pipeDecode = go1 $ runGetPartial get | |
where | |
go1 :: (B.ByteString -> Result a) -> Pipe B.ByteString a m r | |
go1 f = do | |
bs <- await | |
go2 f bs | |
go2 :: (B.ByteString -> Result a) -> B.ByteString -> Pipe B.ByteString a m r | |
go2 f bs = | |
case f bs of | |
Done x leftover -> do | |
yield x | |
go2 (runGetPartial get) leftover | |
Partial g -> do | |
go1 g | |
Fail err _ -> do | |
error $ "pipeDecode: " ++ err | |
pipeEncode :: forall a m r. (Serialize a, Monad m) => Pipe a B.ByteString m r | |
pipeEncode = forever $ do | |
x <- await | |
yield $ runPut $ put x |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment