Skip to content

Instantly share code, notes, and snippets.

@paolino
Last active May 22, 2024 10:01
Show Gist options
  • Save paolino/d17ebc1b1e01d7ded3455fd9f7c0dfec to your computer and use it in GitHub Desktop.
Save paolino/d17ebc1b1e01d7ded3455fd9f7c0dfec to your computer and use it in GitHub Desktop.
Push to pull transformation, from push callbacks to pull streaming, with generics
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
module Control.PushPull.Step
where
import Prelude hiding (take)
import Control.Concurrent.MVar
( newEmptyMVar
, putMVar
, readMVar
)
import Control.Monad.Fix (fix)
import Data.Kind (Type)
import Streaming
( Stream
, effect
, inspect
, wrap
)
-- This imports are for the exericise
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM
( atomically
, newTChanIO
, readTChan
, writeTChan
)
import Control.Monad (forever)
import Control.Monad.Cont (ContT (..), evalContT)
import Control.Monad.IO.Class (liftIO)
-- | A callback that takes an input and returns an output in a monad.
newtype Callback m a b = Callback (a -> m b)
-- | A request that stores an input and a callback to return an output.
data Request m a b = Request a (b -> m ())
-- | Domain and codomain of the callback.
data Domains a b = Domains a b
{-| A record of callbacks that can be passed to the service to transform
its nature from push to pull.
-}
type CallbackProd m fs = ProductMT Callback m fs
{-| A generic functor serves as an element to store pure requests in a stream. The
'x' is necessary to store the stream continuation into.
Notice that the 'Stream' data type cleverly differentiates between the pure
layers and the effectful layers. This allows us to store a pure request instead
of a 'Request' data type.
The stream producer will handle the composition of the pure layer with the
effectful layer to store the 'Request' data type. The net result is that the
parameter 'm' does not appear in the 'ElementLayer' data type.
-}
data ElementLayer as x where
This :: a -> (b -> x) -> ElementLayer (Domains a b ': as) x
Next :: ElementLayer as x -> ElementLayer (a ': as) x
instance Functor (ElementLayer as) where
fmap g (This x f) = This x $ g . f
fmap f (Next e) = Next (fmap f e)
-- | A stream of AnyElement that can be pulled from.
type ElementStream m as = Stream (ElementLayer as) m
-- | `fmap snd` at the type level
type family Codomains as where
Codomains '[] = '[]
Codomains (Domains _a b ': as) = b ': Codomains as
{-| A cell that can be taken from and put into. We instroduce this abstraction
to avoid binding the implementation to IO / MVar
-}
data Cell m a = Cell
{ put :: a -> m ()
, take :: m a
}
-- | A function that creates a new cell. Nice Rank2 synonym to pass around
type NewCell m = forall a. m (Cell m a)
{-| Create a new cell using an MVar. MVar has a throughput of 150k messages per second
This is the only implementation of the `NewCell` type that we provide.
A better solution would be for the libraries to expose a streaming interface
That would obviously have no internal blocking.
-}
newMVarCell :: NewCell IO
newMVarCell = do
mvar <- newEmptyMVar
pure
$ Cell
{ put = putMVar mvar
, take = readMVar mvar
}
{-| Create the callbacks to transform a push service into a pull service and
the relative stream to pull from.
-}
newPushToPull
:: (MkRequestSumCells (Codomains fs), MkCallback fs, Monad m)
=> NewCell m
-> m (ElementStream m fs (), CallbackProd m fs)
newPushToPull newCell = do
requests <- newCell
outputs <- mkVar newCell
pure (mkProducer requests, mkCallback requests outputs id)
{-| A generic record specially tailored for fields that are of the form `f m a b`.
we cannot reuse `AllF`
-}
data ProductMT f (m :: Type -> Type) abs where
NilMT :: ProductMT f m '[]
OneMT :: f m a b -> ProductMT f m abs -> ProductMT f m (Domains a b ': abs)
-- | A generic record specially tailored for fields that are of the form `f a`.
data ProductF f as where
NilFunctor :: ProductF f '[]
OneFunctor :: f a -> ProductF f as -> ProductF f (a : as)
-- | A generic sum type specially tailored for fields that are of the form `f m a b`.
data SumMT f (m :: Type -> Type) abs where
ThisMT :: f m a b -> SumMT f m (Domains a b ': abs)
NextMT :: SumMT f m abs -> SumMT f m (a ': abs)
-- | A generic sum type specially tailored for fields that are requests
type RequestSum m fs = SumMT Request m fs
-- | Create a streams out of requests blocked in a Cell.
mkProducer :: Monad m => Cell m (RequestSum m fs) -> ElementStream m fs ()
mkProducer requests = fix $ \producer -> effect $ do
request <- take requests
pure $ wrap $ consLayer producer request
-- | Cons a request to the stream.
consLayer
:: Monad m
=> ElementStream m as ()
-- ^ the stream to loop into
-> RequestSum m fs
-> ElementLayer fs (ElementStream m as ())
consLayer producer = \case
ThisMT (Request a k) ->
This a $ \o -> effect $ do
k o
pure producer
NextMT s -> Next $ consLayer producer s
-- | A generic record holding one Cell in each field to store the outputs of the callbacks.
type Outputs m os = ProductF (Cell m) os
-- | Create a record of Cells to store the outputs of the callbacks.
class MkRequestSumCells os where
mkVar :: Monad m => NewCell m -> m (Outputs m os)
instance MkRequestSumCells '[] where
mkVar :: Monad m => NewCell m -> m (Outputs m '[])
mkVar _ = pure NilFunctor
instance MkRequestSumCells os => MkRequestSumCells (o ': os) where
mkVar :: Monad m => NewCell m -> m (Outputs m (o : os))
mkVar newCell = do
var <- newCell
OneFunctor var <$> mkVar newCell
{-| Create a record of callbacks to that use
* a common input Cell to store all requests types as a generic sum
* a record of Cells to store the outputs of the callbacks.
-}
class MkCallback fs where
mkCallback
:: Monad m
=> Cell m (RequestSum m as)
-- ^ requests for the stream consumer
-> ProductF (Cell m) (Codomains fs)
-> (RequestSum m fs -> RequestSum m as)
-> CallbackProd m fs
instance MkCallback '[] where
mkCallback
:: Cell m (RequestSum m as)
-> ProductF (Cell m) (Codomains '[])
-> (RequestSum m '[] -> RequestSum m as)
-> CallbackProd m '[]
mkCallback _ _ _ = NilMT
instance MkCallback fs => MkCallback (Domains a b ': fs) where
mkCallback
:: Monad m
=> Cell m (RequestSum m as)
-> ProductF (Cell m) (Codomains (Domains a b : fs))
-> (RequestSum m (Domains a b : fs) -> RequestSum m as)
-> CallbackProd m (Domains a b : fs)
mkCallback
inputs
(OneFunctor output outputs)
f =
OneMT
( Callback $ \i -> do
put inputs
$ f
$ ThisMT
$ Request i
$ put output
take output
)
$ mkCallback inputs outputs (f . NextMT)
----- exercise the lib --
newtype Tracer = Tracer (forall a. Show a => a -> IO ())
{-| A service that takes two callbacks and a tracer. It loops forever tracking
the count of loops. It calls the 2 callbacks in sequence, using the result of
the first as input for the second.
It tries to be smart by exposing call backs in IO so that we can do our shit
but we are going to feed to dummy callbacks that actually write the inputs in
a cell and retrieve the outputs from other cell
Thius specific implementation forces the 2 inputs to be queued which is not
specified in the interface of the service. It's the transformation that give us
control.
-}
service
:: Tracer
-> (Int -> IO Int)
-> (String -> IO Int)
-> IO ()
service (Tracer logMe) cb1 cb2 = ($ 0) $ fix $ \go n -> do
logMe ("service count", n)
x <- cb1 $ n
logMe ("service 1", x)
threadDelay 1_000_000
y <- cb2 $ replicate x 'a'
logMe ("service 2", y)
threadDelay 1_000_000
go $ succ n
-- | this is the type that will drive pull-to-push transformation
type Exercise = '[Domains Int Int, Domains String Int]
-- Here you see that in fact we are providing 2 pure callbacks. But it's really
-- up to us now to decide how to consume the service. We could fold over it and
-- apply in general all the controls that we can find in the Stream interface.
consumer
:: Tracer
-> (Int -> Int)
-- ^ the first callback
-> (String -> Int)
-- ^ the second callback
-> ElementStream IO Exercise ()
-- ^ the stream from the reversed service
-> IO ()
consumer (Tracer logMe) cb1 cb2 = fix $ \go s -> do
x <- inspect s
case x of
Left () -> pure ()
Right (This input continue) -> do
logMe ("consumer 1", input)
go $ continue $ cb1 input
Right (Next (This input continue)) -> do
logMe ("consumer 2", input)
go $ continue $ cb2 input
_ -> error "impossible, how do I fix this?"
{-| A tracer that logs to the console in a thread-safe way.
ContT is used to manage the thread acquisition and release.
-}
newQueuingTracer :: ContT r IO Tracer
newQueuingTracer = do
logs <- liftIO newTChanIO
_ <- ContT $ withAsync $ forever $ do
out <- atomically $ readTChan logs
putStrLn out
pure $ Tracer $ \x -> atomically $ writeTChan logs $ show x
test :: IO ()
test = evalContT $ do
-- create a new tracer
tracer <- newQueuingTracer
-- reify the callbacks and the stream from the 'Exercise' type
(producer, OneMT (Callback cb1) (OneMT (Callback cb2) NilMT)) <-
liftIO $ newPushToPull @Exercise newMVarCell
-- spawn the service with the dummy callbacks created
_ <- ContT $ withAsync $ service tracer cb1 cb2
-- consume the stream of requests from the service
liftIO
$ consumer
tracer
(\n -> if even n then n + 1 else n * 2)
(\s -> length s)
producer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment