Skip to content

Instantly share code, notes, and snippets.

@gelisam
Last active October 25, 2022 04:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gelisam/a8bee217410b74f030c21f782de23d11 to your computer and use it in GitHub Desktop.
Save gelisam/a8bee217410b74f030c21f782de23d11 to your computer and use it in GitHub Desktop.
A version of Tee, from the machines package, with N inputs instead of 2
-- in response to https://www.reddit.com/r/haskell/comments/yb9bi4/using_multiple_conduits_as_input_streams/
--
-- The challenge is to implement a function which takes in
-- three Conduits, and uses the values from the first
-- Conduit in order to decide which of the other two
-- Conduits to sample from next. Something like this:
--
-- example bools ints strings = do
-- maybeBool <- awaitMaybe bools
-- case maybeBool of
-- Nothing -> do
-- liftIO $ putStrLn "it's over"
-- Just True -> do
-- int <- await ints
-- yield (int + 4)
-- example
-- Just False -> do
-- str <- await strings
-- liftIO $ putStrLn str
-- yield $ length str
-- example
--
-- This three-Conduits-feeding-into one structure seems
-- impossible, because Conduits are designed to be composed
-- into a flat pipeline. That is, the output of a single
-- upstream Conduit provides all the input values of the
-- downstream Conduit.
--
-- For this reason, the machines package [1] seems more
-- appropriate for this challenge. A Machine is pretty much
-- the same thing as a Conduit, with the added bonus that
-- Machines can be connected in a more complex diagram than
-- just a straight line.
--
-- Even with this more expressive API in hand, the challenge
-- is still not easy!
--
-- Oh, I should probably mention that machine and conduit
-- are _not_ running their pieces concurrently. With a
-- concurrent API, it would be pretty trivial to implement
-- the example function, by reading from one channel and
-- then the other. But here, instead of running computations
-- concurrently, we zip computation descriptions together,
-- lining up the instructions which request data with the
-- instructions which send data in order to obtain a simpler
-- computation description which neither requests nor
-- receives.
--
-- [1] https://hackage.haskell.org/package/machines
{-# LANGUAGE DataKinds, EmptyCase, GADTs, KindSignatures, RankNTypes, ScopedTypeVariables, TypeOperators #-}
module Main where
import Test.DocTest
import Control.Applicative ((<|>))
import Control.Monad.IO.Class (liftIO)
import Data.Machine
-- Machines with zero, one, and two inputs are given special
-- names:
--
-- type Source o = forall k. Machine k o
-- type Process a o = Machine (Is a) o
-- type Tee a b o = Machine (T a b) o
--
-- @Is a@ indicates that there is only one input, whose
-- elements have type @a@, while @T a b@ indicates that
-- there are two inputs, one whose elements have type @a@,
-- and one whose elements have type @b@.
--
-- The way in which these types indicate this is via the
-- number of constructors they have, and via their last type
-- parameter, which isn't shown above because @Is a@ and
-- @T a b@ have kind @* -> *@. Each constructor represents
-- one of the inputs, and the last type parameter specifies
-- the type of the elements received from that input:
--
-- data Is a i where
-- Refl :: Is a a
--
-- data T a b i where
-- L :: T a b a
-- R :: T a b b
--
-- For our challenge, we want a Machine which has three
-- inputs, so we will need to define our own datatype:
--
-- data T3 a b c i where
-- T1 :: T3 a b c a
-- T2 :: T3 a b c b
-- T3 :: T3 a b c c
--
-- type Tee3 a b c o = Machine (T3 a b c) o
--
-- The machines package provides functions for attaching
-- inputs to Processes and Tees:
--
-- cap :: Source a -> Process a o -> Source o
-- capL :: Source a -> Tee a b o -> Process b o
-- capR :: Source b -> Tee a b o -> Process a o
--
-- So we will need to define our own functions for
-- attaching input machines to a Tee3:
--
-- cap1 :: Source a -> Tee3 a b c o -> Tee b c o
-- cap2 :: Source b -> Tee3 a b c o -> Tee a c o
-- cap3 :: Source c -> Tee3 a b c o -> Tee a b o
--
-- This is especially sad because the work will need to be
-- repeated for Tee4, etc.
--
-- So I decided to do the work in a more general way, to
-- support an arbitrary number of inputs, so that nobody
-- else has to do this work ever again! You're welcome :)
-- Instead of 'Tee3', we need something more general, which
-- can select one type parameter from an arbitrarily-long
-- list of type parameters.
data Elem (as :: [*]) (a :: *) where
Here
:: Elem (a ': as) a
There
:: Elem as a
-> Elem (b ': as) a
-- A value of type @Elem as@ selects one entry from the
-- type-level list @as@. If that list is empty, then there
-- is no way to select an entry from it, so @Elem '[]@ must
-- be uninhabited.
absurdElem
:: Elem '[] a -> b
absurdElem elem_
= case elem_ of {}
-- type Source o = PolyTee '[] o
-- type Process a o = PolyTee '[a] o
-- type Tee a b o = PolyTee '[a,b] o
-- type Tee3 a b c o = PolyTee '[a,b,c] o
type PolyTee as o = Machine (Elem as) o
type PolyTeeT m as o = MachineT m (Elem as) o
-- polyCapL :: Source a -> Process a o -> Source o -- cap
-- polyCapL :: Source a -> Tee a b o -> Process b o -- capL
-- polyCapL :: Source a -> Tee3 a b c o -> Tee b c o -- cap1
polyCapL
:: forall m a1 as o. Monad m
=> SourceT m a1
-> PolyTeeT m (a1 ': as) o
-> PolyTeeT m as o
polyCapL = go
where
-- specialize SourceT from
-- (forall k. MachineT m k a1)
-- to
-- MachineT m (Elem '[]) a1
-- so we can prove that it never Awaits.
go :: MachineT m (Elem '[]) a1
-> MachineT m (Elem (a1 ': as)) o
-> MachineT m (Elem as) o
go m1 mN = MachineT $ do
stepN <- runMachineT mN
case stepN of
Stop -> do
pure Stop
Yield o ccN -> do
pure $ Yield o
$ go m1 ccN
Await ccJust Here ccNothing -> do
-- ccJust is the continuation which runs if the
-- selected input m1 does yield a value
-- downstream.
-- ccNothing is the computation which runs if that
-- input terminates early.
step1 <- runMachineT m1
case step1 of
Stop -> do
let mNothing = go stopped ccNothing
runMachineT mNothing
Yield a1 cc1 -> do
-- The 'Yield' instruction from upstream lines
-- up with an 'Await' instruction from
-- downstream; remove both, yielding a simpler
-- computation which performs neither
-- instruction.
let mJust a1_ = go cc1 (ccJust a1_)
runMachineT (mJust a1)
Await _ void1 _ -> do
absurdElem void1
Await ccJust (There e) ccNothing -> do
let mJust e_ = go m1 (ccJust e_)
let mNothing = go m1 ccNothing
pure $ Await mJust e mNothing
-- All the inputs have been plugged-in; convert the
-- 'PolyTee' into a 'Source' so that we can call 'run' on
-- it, or use it as input to something else.
polyCapR
:: forall m b. Monad m
=> PolyTeeT m '[] b
-> SourceT m b
polyCapR
= fit absurdElem
-- |
-- Lile 'awaits', but returns 'Nothing' instead of stopping
-- if the upstream machine has stopped.
awaitsMaybe
:: forall k i o
. k i
-> Plan k o (Maybe i)
awaitsMaybe k
= (Just <$> awaits k)
<|> pure Nothing
-- | We are finally ready to complete the challenge!
-- >>> :{
-- runT $ polyCapR
-- $ polyCapL (source ["foo", "bar", "quux"])
-- $ polyCapL (source [1..])
-- $ polyCapL (source [True, False, False, True, False])
-- $ example
-- :}
-- foo
-- bar
-- quux
-- it's over
-- [5,3,3,6,4]
example
:: PolyTeeT IO '[Bool, Int, String] Int
example = construct go
where
go :: PlanT (Elem '[Bool, Int, String]) Int IO ()
go = do
maybeBool <- awaitsMaybe Here
case maybeBool of
Nothing -> do
liftIO $ putStrLn "it's over"
Just True -> do
int <- awaits $ There Here
yield (int + 4)
go
Just False -> do
str <- awaits $ There $ There Here
liftIO $ putStrLn str
yield $ length str
go
main :: IO ()
main = do
putStrLn "typechecks."
test :: IO ()
test = do
doctest ["src/Main.hs"]
@gelisam
Copy link
Author

gelisam commented Oct 25, 2022

See https://gist.github.com/gelisam/d789246eacfa0bfc75d28e2b492f9a7d for a solution which sticks to the conduit package.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment