Last active
May 22, 2024 10:01
-
-
Save paolino/d17ebc1b1e01d7ded3455fd9f7c0dfec to your computer and use it in GitHub Desktop.
Push to pull transformation, from push callbacks to pull streaming, with generics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE DataKinds #-} | |
{-# LANGUAGE DeriveFunctor #-} | |
{-# LANGUAGE FlexibleContexts #-} | |
{-# LANGUAGE FlexibleInstances #-} | |
{-# LANGUAGE GADTs #-} | |
{-# LANGUAGE InstanceSigs #-} | |
{-# LANGUAGE LambdaCase #-} | |
{-# LANGUAGE NumericUnderscores #-} | |
{-# LANGUAGE RankNTypes #-} | |
{-# LANGUAGE TypeApplications #-} | |
{-# LANGUAGE TypeFamilies #-} | |
{-# LANGUAGE TypeOperators #-} | |
module Control.PushPull.Step | |
where | |
import Prelude hiding (take) | |
import Control.Concurrent.MVar | |
( newEmptyMVar | |
, putMVar | |
, readMVar | |
) | |
import Control.Monad.Fix (fix) | |
import Data.Kind (Type) | |
import Streaming | |
( Stream | |
, effect | |
, inspect | |
, wrap | |
) | |
-- This imports are for the exericise | |
import Control.Concurrent (threadDelay) | |
import Control.Concurrent.Async (withAsync) | |
import Control.Concurrent.STM | |
( atomically | |
, newTChanIO | |
, readTChan | |
, writeTChan | |
) | |
import Control.Monad (forever) | |
import Control.Monad.Cont (ContT (..), evalContT) | |
import Control.Monad.IO.Class (liftIO) | |
-- | A callback that takes an input and returns an output in a monad. | |
newtype Callback m a b = Callback (a -> m b) | |
-- | A request that stores an input and a callback to return an output. | |
data Request m a b = Request a (b -> m ()) | |
-- | Domain and codomain of the callback. | |
data Domains a b = Domains a b | |
{-| A record of callbacks that can be passed to the service to transform | |
its nature from push to pull. | |
-} | |
type CallbackProd m fs = ProductMT Callback m fs | |
{-| A generic functor serves as an element to store pure requests in a stream. The | |
'x' is necessary to store the stream continuation into. | |
Notice that the 'Stream' data type cleverly differentiates between the pure | |
layers and the effectful layers. This allows us to store a pure request instead | |
of a 'Request' data type. | |
The stream producer will handle the composition of the pure layer with the | |
effectful layer to store the 'Request' data type. The net result is that the | |
parameter 'm' does not appear in the 'ElementLayer' data type. | |
-} | |
data ElementLayer as x where | |
This :: a -> (b -> x) -> ElementLayer (Domains a b ': as) x | |
Next :: ElementLayer as x -> ElementLayer (a ': as) x | |
instance Functor (ElementLayer as) where | |
fmap g (This x f) = This x $ g . f | |
fmap f (Next e) = Next (fmap f e) | |
-- | A stream of AnyElement that can be pulled from. | |
type ElementStream m as = Stream (ElementLayer as) m | |
-- | `fmap snd` at the type level | |
type family Codomains as where | |
Codomains '[] = '[] | |
Codomains (Domains _a b ': as) = b ': Codomains as | |
{-| A cell that can be taken from and put into. We instroduce this abstraction | |
to avoid binding the implementation to IO / MVar | |
-} | |
data Cell m a = Cell | |
{ put :: a -> m () | |
, take :: m a | |
} | |
-- | A function that creates a new cell. Nice Rank2 synonym to pass around | |
type NewCell m = forall a. m (Cell m a) | |
{-| Create a new cell using an MVar. MVar has a throughput of 150k messages per second | |
This is the only implementation of the `NewCell` type that we provide. | |
A better solution would be for the libraries to expose a streaming interface | |
That would obviously have no internal blocking. | |
-} | |
newMVarCell :: NewCell IO | |
newMVarCell = do | |
mvar <- newEmptyMVar | |
pure | |
$ Cell | |
{ put = putMVar mvar | |
, take = readMVar mvar | |
} | |
{-| Create the callbacks to transform a push service into a pull service and | |
the relative stream to pull from. | |
-} | |
newPushToPull | |
:: (MkRequestSumCells (Codomains fs), MkCallback fs, Monad m) | |
=> NewCell m | |
-> m (ElementStream m fs (), CallbackProd m fs) | |
newPushToPull newCell = do | |
requests <- newCell | |
outputs <- mkVar newCell | |
pure (mkProducer requests, mkCallback requests outputs id) | |
{-| A generic record specially tailored for fields that are of the form `f m a b`. | |
we cannot reuse `AllF` | |
-} | |
data ProductMT f (m :: Type -> Type) abs where | |
NilMT :: ProductMT f m '[] | |
OneMT :: f m a b -> ProductMT f m abs -> ProductMT f m (Domains a b ': abs) | |
-- | A generic record specially tailored for fields that are of the form `f a`. | |
data ProductF f as where | |
NilFunctor :: ProductF f '[] | |
OneFunctor :: f a -> ProductF f as -> ProductF f (a : as) | |
-- | A generic sum type specially tailored for fields that are of the form `f m a b`. | |
data SumMT f (m :: Type -> Type) abs where | |
ThisMT :: f m a b -> SumMT f m (Domains a b ': abs) | |
NextMT :: SumMT f m abs -> SumMT f m (a ': abs) | |
-- | A generic sum type specially tailored for fields that are requests | |
type RequestSum m fs = SumMT Request m fs | |
-- | Create a streams out of requests blocked in a Cell. | |
mkProducer :: Monad m => Cell m (RequestSum m fs) -> ElementStream m fs () | |
mkProducer requests = fix $ \producer -> effect $ do | |
request <- take requests | |
pure $ wrap $ consLayer producer request | |
-- | Cons a request to the stream. | |
consLayer | |
:: Monad m | |
=> ElementStream m as () | |
-- ^ the stream to loop into | |
-> RequestSum m fs | |
-> ElementLayer fs (ElementStream m as ()) | |
consLayer producer = \case | |
ThisMT (Request a k) -> | |
This a $ \o -> effect $ do | |
k o | |
pure producer | |
NextMT s -> Next $ consLayer producer s | |
-- | A generic record holding one Cell in each field to store the outputs of the callbacks. | |
type Outputs m os = ProductF (Cell m) os | |
-- | Create a record of Cells to store the outputs of the callbacks. | |
class MkRequestSumCells os where | |
mkVar :: Monad m => NewCell m -> m (Outputs m os) | |
instance MkRequestSumCells '[] where | |
mkVar :: Monad m => NewCell m -> m (Outputs m '[]) | |
mkVar _ = pure NilFunctor | |
instance MkRequestSumCells os => MkRequestSumCells (o ': os) where | |
mkVar :: Monad m => NewCell m -> m (Outputs m (o : os)) | |
mkVar newCell = do | |
var <- newCell | |
OneFunctor var <$> mkVar newCell | |
{-| Create a record of callbacks to that use | |
* a common input Cell to store all requests types as a generic sum | |
* a record of Cells to store the outputs of the callbacks. | |
-} | |
class MkCallback fs where | |
mkCallback | |
:: Monad m | |
=> Cell m (RequestSum m as) | |
-- ^ requests for the stream consumer | |
-> ProductF (Cell m) (Codomains fs) | |
-> (RequestSum m fs -> RequestSum m as) | |
-> CallbackProd m fs | |
instance MkCallback '[] where | |
mkCallback | |
:: Cell m (RequestSum m as) | |
-> ProductF (Cell m) (Codomains '[]) | |
-> (RequestSum m '[] -> RequestSum m as) | |
-> CallbackProd m '[] | |
mkCallback _ _ _ = NilMT | |
instance MkCallback fs => MkCallback (Domains a b ': fs) where | |
mkCallback | |
:: Monad m | |
=> Cell m (RequestSum m as) | |
-> ProductF (Cell m) (Codomains (Domains a b : fs)) | |
-> (RequestSum m (Domains a b : fs) -> RequestSum m as) | |
-> CallbackProd m (Domains a b : fs) | |
mkCallback | |
inputs | |
(OneFunctor output outputs) | |
f = | |
OneMT | |
( Callback $ \i -> do | |
put inputs | |
$ f | |
$ ThisMT | |
$ Request i | |
$ put output | |
take output | |
) | |
$ mkCallback inputs outputs (f . NextMT) | |
----- exercise the lib -- | |
newtype Tracer = Tracer (forall a. Show a => a -> IO ()) | |
{-| A service that takes two callbacks and a tracer. It loops forever tracking | |
the count of loops. It calls the 2 callbacks in sequence, using the result of | |
the first as input for the second. | |
It tries to be smart by exposing call backs in IO so that we can do our shit | |
but we are going to feed to dummy callbacks that actually write the inputs in | |
a cell and retrieve the outputs from other cell | |
Thius specific implementation forces the 2 inputs to be queued which is not | |
specified in the interface of the service. It's the transformation that give us | |
control. | |
-} | |
service | |
:: Tracer | |
-> (Int -> IO Int) | |
-> (String -> IO Int) | |
-> IO () | |
service (Tracer logMe) cb1 cb2 = ($ 0) $ fix $ \go n -> do | |
logMe ("service count", n) | |
x <- cb1 $ n | |
logMe ("service 1", x) | |
threadDelay 1_000_000 | |
y <- cb2 $ replicate x 'a' | |
logMe ("service 2", y) | |
threadDelay 1_000_000 | |
go $ succ n | |
-- | this is the type that will drive pull-to-push transformation | |
type Exercise = '[Domains Int Int, Domains String Int] | |
-- Here you see that in fact we are providing 2 pure callbacks. But it's really | |
-- up to us now to decide how to consume the service. We could fold over it and | |
-- apply in general all the controls that we can find in the Stream interface. | |
consumer | |
:: Tracer | |
-> (Int -> Int) | |
-- ^ the first callback | |
-> (String -> Int) | |
-- ^ the second callback | |
-> ElementStream IO Exercise () | |
-- ^ the stream from the reversed service | |
-> IO () | |
consumer (Tracer logMe) cb1 cb2 = fix $ \go s -> do | |
x <- inspect s | |
case x of | |
Left () -> pure () | |
Right (This input continue) -> do | |
logMe ("consumer 1", input) | |
go $ continue $ cb1 input | |
Right (Next (This input continue)) -> do | |
logMe ("consumer 2", input) | |
go $ continue $ cb2 input | |
_ -> error "impossible, how do I fix this?" | |
{-| A tracer that logs to the console in a thread-safe way. | |
ContT is used to manage the thread acquisition and release. | |
-} | |
newQueuingTracer :: ContT r IO Tracer | |
newQueuingTracer = do | |
logs <- liftIO newTChanIO | |
_ <- ContT $ withAsync $ forever $ do | |
out <- atomically $ readTChan logs | |
putStrLn out | |
pure $ Tracer $ \x -> atomically $ writeTChan logs $ show x | |
test :: IO () | |
test = evalContT $ do | |
-- create a new tracer | |
tracer <- newQueuingTracer | |
-- reify the callbacks and the stream from the 'Exercise' type | |
(producer, OneMT (Callback cb1) (OneMT (Callback cb2) NilMT)) <- | |
liftIO $ newPushToPull @Exercise newMVarCell | |
-- spawn the service with the dummy callbacks created | |
_ <- ContT $ withAsync $ service tracer cb1 cb2 | |
-- consume the stream of requests from the service | |
liftIO | |
$ consumer | |
tracer | |
(\n -> if even n then n + 1 else n * 2) | |
(\s -> length s) | |
producer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment