Skip to content

Instantly share code, notes, and snippets.

@YoEight
Created February 9, 2014 18:47
Show Gist options
  • Save YoEight/8904145 to your computer and use it in GitHub Desktop.
Save YoEight/8904145 to your computer and use it in GitHub Desktop.
Possible Process Haskell impl.
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE GADTs #-}
module Data.Process where
import Prelude hiding (zipWith)
import Control.Applicative
import Control.Monad
import Data.Foldable
import System.IO
import System.IO.Error
infixr 9 <~
infixl 9 ~>
data Step m o r where
Yield :: o -> r -> Step m o r
Await :: m a -> (a -> r) -> r -> Step m o r
Stop :: Step m o r
newtype Process m o
= Process
{ unProcess :: Step m o (Process m o) }
data Is a b where
Is :: Is a a
type Process1 a b = Process (Is a) b
data TeeOp a b c where
TL :: TeeOp a b a
TR :: TeeOp a b b
type Tee a b c = Process (TeeOp a b) c
data WyeOp a b c where
WL :: WyeOp a b a
WR :: WyeOp a b b
WBoth :: WyeOp a b (Either a b)
type Wye a b c = Process (WyeOp a b) c
type Channel a m b = Process m (a -> m b)
type Sink m a = Channel a m ()
newtype Plan f o u
= Plan
{ unPlan ::
forall r.
(u -> r) -> -- done
(o -> r -> r) -> -- emit
(forall a. f a -> (a -> r) -> r -> r) -> -- await
r -> -- halt
r
}
instance Functor (Plan f o) where
fmap f (Plan k) = Plan $ \kp -> k (kp . f)
instance Applicative (Plan f o) where
pure = return
(<*>) = ap
instance Monad (Plan f o) where
return a = Plan $ \kp _ _ _ -> kp a
Plan k >>= f = Plan $ \kp ke ka kr ->
k (\a -> unPlan (f a) kp ke ka kr) ke ka kr
repeatedly :: Plan m o u -> Process m o
repeatedly (Plan k) = r
where
r = Process $ k
(const $ unProcess r)
(\o n -> Yield o (Process n))
(\rq c fb -> Await rq (Process . c) (Process fb))
Stop
process :: Plan m o u -> Process m o
process (Plan k) =
Process $ k
(const Stop)
(\o n -> Yield o (Process n))
(\rq c fb -> Await rq (Process . c) (Process fb))
Stop
resource :: IO r
-> (r -> IO ())
-> (r -> IO (Maybe o))
-> Plan IO o ()
resource ack release step = onAwait go ack
where
go r = onAwaitFb (go1 r) (handle r) (cleanup r)
handle r = catchIOError (step r) $ \e -> do
release r
ioError e
go1 r (Just o) = yield o >> onAwaitFb (go1 r) (handle r) (cleanup r)
go1 r _ = cleanup r
cleanup r = await (release r)
-- | Use fallback only if Await req has been requested
onAwaitFb :: (a -> Plan f o u)
-> f a
-> Plan f o u
-> Plan f o u
onAwaitFb k rq fb = Plan $ \kp ke ka kr ->
let go a = unPlan (k a) kp ke ka (unPlan fb kp ke ka kr) in
ka rq go kr
onAwait :: (a -> Plan f o u) -> f a -> Plan f o u
onAwait k rq = Plan $ \kp ke ka kr ->
let go a = unPlan (k a) kp ke ka kr in
ka rq go kr
await :: f a -> Plan f o a
await fa = Plan $ \kp _ ka kr -> ka fa kp kr
await1 :: Plan (Is a) o a
await1 = await Is
awaitL :: Plan (TeeOp a b) o a
awaitL = await TL
awaitR :: Plan (TeeOp a b) o b
awaitR = await TR
yield :: o -> Plan f o ()
yield o = Plan $ \kp ke _ _ -> ke o (kp ())
halt :: Plan f o a
halt = Plan $ \_ _ _ kr -> kr
stopped :: Process m o
stopped = Process Stop
liftF :: (a -> b) -> Process1 a b
liftF f = repeatedly $ do
a <- await1
yield (f a)
(<~) :: Process1 a b -> Process m a -> Process m b
p1 <~ p2 =
Process $
case unProcess p1 of
Stop -> Stop
Yield b n -> Yield b (n <~ p2)
Await Is k fb ->
case unProcess p2 of
Stop -> unProcess (fb <~ stopped)
Yield a pn -> unProcess (k a <~ pn)
Await rq c pfb -> Await rq (\r -> p1 <~ c r) (p1 <~ pfb)
(~>) :: Process m a -> Process1 a b -> Process m b
p1 ~> p2 = p2 <~ p1
tee :: Tee a b c -> Process m a -> Process m b -> Process m c
tee p1 p2 p3 =
Process $
case unProcess p1 of
Stop -> Stop
Yield c n -> Yield c (tee n p2 p3)
Await TL k fb ->
case unProcess p2 of
Stop -> unProcess (tee fb stopped p3)
Yield a p2n -> unProcess (tee (k a) p2n p3)
Await rq ca pfa -> Await rq (\r -> tee p1 (ca r) p3)
(tee p1 pfa p3)
Await TR k fb ->
case unProcess p3 of
Stop -> unProcess (tee fb p2 stopped)
Yield b p3n -> unProcess (tee (k b) p2 p3n)
Await rq cb pfb -> Await rq (\r -> tee p1 p2 (cb r))
(tee p1 p2 pfb)
zipWith :: (a -> b -> c) -> Tee a b c
zipWith f = repeatedly $ do
a <- awaitL
b <- awaitR
yield (f a b)
eval :: Process m (m a) -> Process m a
eval p =
Process $
case unProcess p of
Stop -> Stop
Yield ma n -> Await ma (\a -> Process $ Yield a (eval n)) stopped
Await rq c fb -> Await rq (eval . c) (eval fb)
through :: Process m a -> Channel a m b -> Process m b
through p1 p2 = eval $ tee (zipWith (\a f -> f a)) p1 p2
source :: Foldable f => f a -> Process m a
source = process . traverse_ yield
run :: Monad m => Process m a -> m ()
run m =
case unProcess m of
Stop -> return ()
Yield _ n -> run n
Await rq c _ -> run . c =<< rq
printSink :: Show a => Sink IO a
printSink = repeatedly $ yield print
test :: IO ()
test = run $ through (source [1..10]) printSink
-- Output
-- 1
-- 2
-- 3
-- 4
-- 5
-- 6
-- 7
-- 8
-- 9
-- 10
testFile :: FilePath -> IO ()
testFile path = run $ through src printSink
where
src = process $ resource open close go
open = do
print "Open"
openFile path ReadMode
close h = do
hClose h
print "Close"
go h = do
eof <- hIsEOF h
if eof
then return Nothing
else fmap Just (hGetLine h)
-- *Data.Process> testFile "stream.cabal"
-- "Open"
-- "-- Initial stream.cabal generated by cabal init. For further"
-- "-- documentation, see http://haskell.org/cabal/users-guide/"
-- ""
-- "name: stream"
-- "version: 0.1.0.0"
-- "-- synopsis:"
-- "-- description:"
-- "-- license:"
-- "license-file: LICENSE"
-- "author: Yorick Laupa"
-- "-- maintainer:"
-- "-- copyright:"
-- "category: Data"
-- "build-type: Simple"
-- "cabal-version: >=1.8"
-- ""
-- "library"
-- " exposed-modules: Data.Process"
-- " -- other-modules:"
-- " build-depends: base ==4.6.*"
-- " , semigroups >=0.9.2"
-- "Close"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment