Skip to content

Instantly share code, notes, and snippets.

@bitonic
Last active August 29, 2015 14:11
Show Gist options
  • Save bitonic/a48518b54615d17daad4 to your computer and use it in GitHub Desktop.
Save bitonic/a48518b54615d17daad4 to your computer and use it in GitHub Desktop.
{-# OPTIONS_GHC -Wall -fno-warn-name-shadowing #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- | Run starting from "begin" and ending with "end", e.g.
--
-- @
-- ./MapReduce begin 10000 &
-- ./MapReduce segment 0 10001 '127.0.0.1' 10000 &
-- ./MapReduce segment 1 10002 '127.0.0.1' 10001 &
-- ./MapReduce end '127.0.0.1' 10002 &
module Main where
import Control.Monad
import qualified Data.ByteString as B
import Data.Serialize
import Pipes
import Pipes.Network.TCP
import System.Environment
data Pipeline_ a m where
PLEnd :: Consumer a m () -> Pipeline_ a m
PLCons :: (Serialize b) => Pipe a b m () -> Pipeline_ b m -> Pipeline_ a m
data Pipeline m =
forall a. (Serialize a) => Pipeline (Producer a m ()) (Pipeline_ a m)
infixr >-->
(>-->) :: (Serialize b) => Pipe a b m () -> Pipeline_ b m -> Pipeline_ a m
(>-->) = PLCons
infixr |-->
(|-->) :: (Serialize a) => Producer a m () -> Pipeline_ a m -> Pipeline m
(|-->) = Pipeline
infixr >--|
(>--|) :: (Serialize b) => Pipe a b m () -> Consumer b m () -> Pipeline_ a m
p1 >--| p2 = PLCons p1 (PLEnd p2)
getSegment
:: forall m x.
Pipeline m -> Int
-> (forall a b. (Serialize a, Serialize b) => Pipe a b m () -> x)
-> x
getSegment (Pipeline _ pl) n cont =
if n < 0
then error "getSegment: negative index"
else go n pl
where
go :: (Serialize b) => Int -> Pipeline_ b m -> x
go 0 (PLCons p _) = cont p
go n (PLCons _ pl) = go (n-1) pl
go _ _ = error "getSegment: out of bounds"
runSegment
:: (Serialize a, Serialize b, MonadIO m) => Socket -> Socket -> Pipe a b m () -> m ()
runSegment sckL sckR p =
runEffect $ fromSocket sckL 4096 >-> pipeDecode >-> p >-> pipeEncode >-> toSocket sckR
getBegin
:: Pipeline m
-> (forall a. (Serialize a) => Producer a m () -> x)
-> x
getBegin (Pipeline p _) cont = cont p
runBegin
:: (Serialize a, MonadIO m) => Socket -> Producer a m () -> m ()
runBegin sck p =
runEffect $ p >-> pipeEncode >-> toSocket sck
getEnd
:: forall m x.
Pipeline m
-> (forall a. (Serialize a) => Consumer a m () -> x)
-> x
getEnd (Pipeline _ pl) cont = go pl
where
go :: (Serialize b) => Pipeline_ b m -> x
go (PLEnd p) = cont p
go (PLCons _ pl) = go pl
runEnd
:: (Serialize a, MonadIO m) => Socket -> Consumer a m () -> m ()
runEnd sck p =
runEffect $ fromSocket sck 4096 >-> pipeDecode >-> p
runPipeline :: forall m. (Monad m) => Pipeline m -> Effect m ()
runPipeline (Pipeline p1 p2) = p1 >-> go p2
where
go :: Pipeline_ a m -> Consumer a m ()
go (PLEnd p) = p
go (PLCons p pl) = p >-> go pl
--
testPipeline :: Pipeline IO
testPipeline =
mapM_ yield [10::Int,20,3,4] |-->
showNumber >--> reverseString >--|
-- pipeEncode >--> pipeDecode >--|
printNumber
where
showNumber :: (Monad m) => Pipe Int String m a
showNumber = forever $ do
n <- await
yield $ show n
reverseString :: (Monad m) => Pipe String String m a
reverseString = forever $ do
s <- await
yield $ reverse s
printNumber :: Consumer String IO a
printNumber = forever $ do
n <- await
lift $ putStrLn n
main :: IO ()
main = do
(type_ : args) <- getArgs
case type_ of
"end" -> do
let [hostName, port] = args
myConnect hostName port $ \sck -> getEnd pipeline $ runEnd sck
"begin" -> do
let [port] = args
myServe port $ \sck -> getBegin pipeline $ runBegin sck
"segment" -> do
-- We're in the middle
[ix, myPort, theirHostName, theirPort] <- return args
ix <- return $ read ix
myServe myPort $ \sckR -> do
myConnect theirHostName theirPort $ \sckL -> do
getSegment pipeline ix $ runSegment sckL sckR
_ -> do
error "bad usage"
where
pipeline = testPipeline
myServe port cont = serve (Host "127.0.0.1") port $ \(sck, addr) -> do
putStrLn $ "TCP connection established from " ++ show addr
cont sck
myConnect hostName port cont = connect hostName port $ \(sck, addr) -> do
putStrLn $ "Connection established to " ++ show addr
cont sck
-- Encode/decode
------------------------------------------------------------------------
pipeDecode :: forall a m r. (Serialize a, Monad m) => Pipe B.ByteString a m r
pipeDecode = go1 $ runGetPartial get
where
go1 :: (B.ByteString -> Result a) -> Pipe B.ByteString a m r
go1 f = do
bs <- await
go2 f bs
go2 :: (B.ByteString -> Result a) -> B.ByteString -> Pipe B.ByteString a m r
go2 f bs =
case f bs of
Done x leftover -> do
yield x
go2 (runGetPartial get) leftover
Partial g -> do
go1 g
Fail err _ -> do
error $ "pipeDecode: " ++ err
pipeEncode :: forall a m r. (Serialize a, Monad m) => Pipe a B.ByteString m r
pipeEncode = forever $ do
x <- await
yield $ runPut $ put x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment