Created
April 22, 2014 03:25
-
-
Save Davorak/11164450 to your computer and use it in GitHub Desktop.
Full example of non-determinism while streaming with GIVars in HdpH
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE TemplateHaskell #-} | |
{-# language RankNTypes #-} | |
-- {-# LANGUAGE NoMonomorphismRestriction #-} | |
-- {-# LANGUAGE DeriveDataTypeable #-} | |
-- {-# LANGUAGE DeriveGeneric #-} | |
{-# LANGUAGE GeneralizedNewtypeDeriving #-} | |
{-# LANGUAGE FlexibleInstances #-} | |
{-# LANGUAGE IncoherentInstances #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
{-# LANGUAGE FlexibleContexts #-} | |
{-# OPTIONS_GHC -fno-warn-orphans #-} | |
module Main where | |
import Prelude | |
import Data.Monoid (mconcat) | |
import System.Environment (getArgs) | |
import System.IO (stdout, stderr, hSetBuffering, BufferMode(..)) | |
import Control.Parallel.HdpH | |
(RTSConf(..), defaultRTSConf, updateConf, | |
Par, runParIO_, | |
myNode, allNodes, io, pushTo, new, get, glob, rput, put, | |
Node, IVar, GIVar, | |
Thunk(Thunk), Closure, mkClosure, | |
toClosure, ToClosure(locToClosure), | |
static, StaticToClosure, staticToClosure, | |
fork, | |
StaticDecl, declare, register, here) | |
import qualified Control.Parallel.HdpH as HdpH (declareStatic) | |
---- | |
import Pipes | |
import qualified Pipes.Prelude as P | |
import Data.Serialize (Serialize) | |
import Control.Parallel.HdpH.Closure (unClosure, apC) | |
---- | |
instance ToClosure (Int, ClosureR Int) where locToClosure = $(here) | |
instance ToClosure (Closure (Int, ClosureR Int)) where locToClosure = $(here) | |
instance ToClosure (GIVar (Closure (GIVar (Closure Int)))) where locToClosure = $(here) | |
instance ToClosure (GIVar (Closure (Int, ClosureR Int))) where locToClosure = $(here) | |
instance ToClosure (GIVar (Closure (GIVar (Closure (Int, ClosureR Int))))) where locToClosure = $(here) | |
instance ToClosure (GIVar (Closure Int)) where locToClosure = $(here) | |
instance ToClosure Int where locToClosure = $(here) | |
instance ToClosure (GIVar Int) where locToClosure = $(here) | |
instance ToClosure (ClosureR Int) where locToClosure = $(here) | |
{- | The below uses a newtype wrapper to create an infinite request cycle with | |
- 'GIVar's. | |
-} | |
newtype ClosureR a = ClosureR { unClosureR :: GIVar (Closure (GIVar (Closure (a, ClosureR a)))) } | |
deriving (Serialize) | |
toGCoPar | |
:: ToClosure (a, ClosureR a) => | |
IVar (Closure (GIVar (Closure (a, ClosureR a)))) | |
-> Consumer a Par r | |
toGCoPar ref0 = loop ref0 | |
where | |
loop ref = do | |
x <- await | |
sRef <- fmap unClosure . lift $ get ref | |
nextRef <- lift new | |
gnextRef <- lift $ glob nextRef | |
lift $ rput sRef $ toClosure (x, ClosureR gnextRef) | |
loop nextRef | |
fromGCoPar | |
:: ToClosure (GIVar (Closure (a, ClosureR a))) => | |
GIVar (Closure (GIVar (Closure (a, ClosureR a)))) | |
-> Producer a Par r | |
fromGCoPar rgvar0 = loop rgvar0 | |
where | |
loop rgvar = do | |
-- take rgvar that links to remote process. Make new global reference | |
-- so the remote process can send you data over it (var, gvar) | |
var <- lift new | |
gvar <- lift $ glob var | |
lift $ rput rgvar (toClosure gvar) -- Send the reference | |
(x, nextRef) <- fmap unClosure . lift $ get var | |
lift . io $ print $ "from yield: " -- ++ show x | |
yield x | |
loop $ unClosureR nextRef | |
remotePrint :: GIVar (Closure (GIVar (Closure (Int, ClosureR Int)))) | |
-> Par () | |
remotePrint rgvar = do | |
runEffect $ fromGCoPar rgvar >-> parPrint | |
return () | |
testGCoPar | |
:: Par () | |
testGCoPar = do | |
master <- myNode | |
sp <- new | |
gsp <- glob sp | |
fork $ pushTo (apC $(mkClosure [| remotePrint |]) (toClosure gsp)) master | |
-- adding or removing the P.tee parPrint below can change the behavior non-locally! | |
fork $ runEffect $ each [1..10::Int] -- >-> P.tee parPrint | |
>-> toGCoPar sp | |
newtype IVarR a = IVarR { unIVarR :: IVar (IVar (a, IVarR a)) } | |
toCoPar | |
:: IVar (IVar (a, IVarR a)) | |
-> Consumer a Par r | |
toCoPar ref0 = loop ref0 | |
where | |
loop ref = do | |
x <- await | |
sRef <- lift $ get ref | |
nextRef <- lift new | |
lift $ put sRef (x, IVarR nextRef) | |
loop nextRef | |
fromCoPar | |
:: IVar (IVar (a, IVarR a)) | |
-> Producer a Par r | |
fromCoPar ref0 = loop ref0 | |
where | |
loop ref = do | |
lp <- lift new | |
lift $ put ref lp | |
(r, nextRef) <- lift $ get lp | |
yield r | |
loop $ unIVarR nextRef | |
testCoParR :: Show a => IVar (IVar (a, IVarR a)) -> Par () | |
testCoParR ref = do | |
runEffect $ fromCoPar ref >-> parPrint | |
io $ print "testCoParR done" | |
testCoPar :: Par () | |
testCoPar = do | |
ref <- new | |
fork $ runEffect $ each [1..10::Int] >-> toCoPar ref | |
fork $ testCoParR ref | |
io $ print "testCoPar done" | |
parPrint :: Show a => Consumer a Par r | |
parPrint = go | |
where | |
go = do | |
x <- await | |
lift . io $ print $ "parPrint: " ++ show x | |
go | |
----------------------------------------------------------------------------- | |
-- Static declaration | |
-- orphan ToClosure instance (unavoidably so) | |
instance ToClosure () where locToClosure = $(here) | |
declareStatic :: StaticDecl | |
declareStatic = mconcat [HdpH.declareStatic, | |
declare (staticToClosure :: StaticToClosure ()) | |
, declare $(static 'remotePrint) | |
-- , declare $(static 'hello_abs) | |
] | |
----------------------------------------------------------------------------- | |
-- initialisation, argument processing and 'main' | |
-- parse runtime system config options; abort if there is an error | |
parseOpts :: [String] -> IO (RTSConf, [String]) | |
parseOpts args = do | |
either_conf <- updateConf args defaultRTSConf | |
case either_conf of | |
Left err_msg -> error $ "parseOpts: " ++ err_msg | |
Right (conf, remaining_args) -> return (conf, remaining_args) | |
main :: IO () | |
main = do | |
hSetBuffering stdout LineBuffering | |
hSetBuffering stderr LineBuffering | |
register declareStatic | |
opts_args <- getArgs | |
(conf, _args) <- parseOpts opts_args | |
runParIO_ (conf { interface="lo" }) testGCoPar -- testCoPar -- hello_world |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment