Skip to content

Instantly share code, notes, and snippets.

@Davorak
Created April 22, 2014 03:25
Show Gist options
  • Save Davorak/11164450 to your computer and use it in GitHub Desktop.
Save Davorak/11164450 to your computer and use it in GitHub Desktop.
Full example of non-determinism while streaming with GIVars in HdpH
{-# LANGUAGE TemplateHaskell #-}
{-# language RankNTypes #-}
-- {-# LANGUAGE NoMonomorphismRestriction #-}
-- {-# LANGUAGE DeriveDataTypeable #-}
-- {-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE IncoherentInstances #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Main where
import Prelude
import Data.Monoid (mconcat)
import System.Environment (getArgs)
import System.IO (stdout, stderr, hSetBuffering, BufferMode(..))
import Control.Parallel.HdpH
(RTSConf(..), defaultRTSConf, updateConf,
Par, runParIO_,
myNode, allNodes, io, pushTo, new, get, glob, rput, put,
Node, IVar, GIVar,
Thunk(Thunk), Closure, mkClosure,
toClosure, ToClosure(locToClosure),
static, StaticToClosure, staticToClosure,
fork,
StaticDecl, declare, register, here)
import qualified Control.Parallel.HdpH as HdpH (declareStatic)
----
import Pipes
import qualified Pipes.Prelude as P
import Data.Serialize (Serialize)
import Control.Parallel.HdpH.Closure (unClosure, apC)
----
instance ToClosure (Int, ClosureR Int) where locToClosure = $(here)
instance ToClosure (Closure (Int, ClosureR Int)) where locToClosure = $(here)
instance ToClosure (GIVar (Closure (GIVar (Closure Int)))) where locToClosure = $(here)
instance ToClosure (GIVar (Closure (Int, ClosureR Int))) where locToClosure = $(here)
instance ToClosure (GIVar (Closure (GIVar (Closure (Int, ClosureR Int))))) where locToClosure = $(here)
instance ToClosure (GIVar (Closure Int)) where locToClosure = $(here)
instance ToClosure Int where locToClosure = $(here)
instance ToClosure (GIVar Int) where locToClosure = $(here)
instance ToClosure (ClosureR Int) where locToClosure = $(here)
{- | The below uses a newtype wrapper to create an infinite request cycle with
- 'GIVar's.
-}
newtype ClosureR a = ClosureR { unClosureR :: GIVar (Closure (GIVar (Closure (a, ClosureR a)))) }
deriving (Serialize)
toGCoPar
:: ToClosure (a, ClosureR a) =>
IVar (Closure (GIVar (Closure (a, ClosureR a))))
-> Consumer a Par r
toGCoPar ref0 = loop ref0
where
loop ref = do
x <- await
sRef <- fmap unClosure . lift $ get ref
nextRef <- lift new
gnextRef <- lift $ glob nextRef
lift $ rput sRef $ toClosure (x, ClosureR gnextRef)
loop nextRef
fromGCoPar
:: ToClosure (GIVar (Closure (a, ClosureR a))) =>
GIVar (Closure (GIVar (Closure (a, ClosureR a))))
-> Producer a Par r
fromGCoPar rgvar0 = loop rgvar0
where
loop rgvar = do
-- take rgvar that links to remote process. Make new global reference
-- so the remote process can send you data over it (var, gvar)
var <- lift new
gvar <- lift $ glob var
lift $ rput rgvar (toClosure gvar) -- Send the reference
(x, nextRef) <- fmap unClosure . lift $ get var
lift . io $ print $ "from yield: " -- ++ show x
yield x
loop $ unClosureR nextRef
remotePrint :: GIVar (Closure (GIVar (Closure (Int, ClosureR Int))))
-> Par ()
remotePrint rgvar = do
runEffect $ fromGCoPar rgvar >-> parPrint
return ()
testGCoPar
:: Par ()
testGCoPar = do
master <- myNode
sp <- new
gsp <- glob sp
fork $ pushTo (apC $(mkClosure [| remotePrint |]) (toClosure gsp)) master
-- adding or removing the P.tee parPrint below can change the behavior non-locally!
fork $ runEffect $ each [1..10::Int] -- >-> P.tee parPrint
>-> toGCoPar sp
newtype IVarR a = IVarR { unIVarR :: IVar (IVar (a, IVarR a)) }
toCoPar
:: IVar (IVar (a, IVarR a))
-> Consumer a Par r
toCoPar ref0 = loop ref0
where
loop ref = do
x <- await
sRef <- lift $ get ref
nextRef <- lift new
lift $ put sRef (x, IVarR nextRef)
loop nextRef
fromCoPar
:: IVar (IVar (a, IVarR a))
-> Producer a Par r
fromCoPar ref0 = loop ref0
where
loop ref = do
lp <- lift new
lift $ put ref lp
(r, nextRef) <- lift $ get lp
yield r
loop $ unIVarR nextRef
testCoParR :: Show a => IVar (IVar (a, IVarR a)) -> Par ()
testCoParR ref = do
runEffect $ fromCoPar ref >-> parPrint
io $ print "testCoParR done"
testCoPar :: Par ()
testCoPar = do
ref <- new
fork $ runEffect $ each [1..10::Int] >-> toCoPar ref
fork $ testCoParR ref
io $ print "testCoPar done"
parPrint :: Show a => Consumer a Par r
parPrint = go
where
go = do
x <- await
lift . io $ print $ "parPrint: " ++ show x
go
-----------------------------------------------------------------------------
-- Static declaration
-- orphan ToClosure instance (unavoidably so)
instance ToClosure () where locToClosure = $(here)
declareStatic :: StaticDecl
declareStatic = mconcat [HdpH.declareStatic,
declare (staticToClosure :: StaticToClosure ())
, declare $(static 'remotePrint)
-- , declare $(static 'hello_abs)
]
-----------------------------------------------------------------------------
-- initialisation, argument processing and 'main'
-- parse runtime system config options; abort if there is an error
parseOpts :: [String] -> IO (RTSConf, [String])
parseOpts args = do
either_conf <- updateConf args defaultRTSConf
case either_conf of
Left err_msg -> error $ "parseOpts: " ++ err_msg
Right (conf, remaining_args) -> return (conf, remaining_args)
main :: IO ()
main = do
hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering
register declareStatic
opts_args <- getArgs
(conf, _args) <- parseOpts opts_args
runParIO_ (conf { interface="lo" }) testGCoPar -- testCoPar -- hello_world
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment