Skip to content

Instantly share code, notes, and snippets.

@jyp
Last active September 16, 2015 09:21
Show Gist options
  • Save jyp/fadd6e8a2a0aa98ae94d to your computer and use it in GitHub Desktop.
Save jyp/fadd6e8a2a0aa98ae94d to your computer and use it in GitHub Desktop.
---
title: On the Duality of Streams
subtitle: How Can Linear Types Help to Solve the Lazy IO Problem?
author:
- name: Jean-Philippe Bernardy
- name: Josef Svenningsson
...
<!--
> {-# LANGUAGE ScopedTypeVariables, TypeOperators, RankNTypes, LiberalTypeSynonyms, BangPatterns, TypeSynonymInstances, FlexibleInstances, FlexibleContexts #-}
> module Organ where
> import System.IO
> import Control.Exception
> import Control.Concurrent (forkIO, readChan, writeChan, Chan, newChan, QSem, newQSem, waitQSem, signalQSem)
> import Control.Applicative hiding (empty)
> import Data.IORef
> import Data.Monoid
> import Prelude hiding (tail)
> import Control.Monad (ap)
-->
\begin{abstract}
In this paper, we present a novel stream-programming library for
Haskell. As other coroutine-based stream libraries, our library
allows synchronous execution, which implies that effects are run in
lockstep and no buffering occurs.
A novelty of our implementation is that it allows to locally introduce
buffering or re-scheduling of effects. The buffering requirements (or
re-scheduling opportunities) are indicated by the type-system.
Our library is based on a number of design principles, adapted from
the theory of Girard's Linear Logic. These principles are applicable
to the design of any Haskell structure where resource management
(memory, IO, ...) is critical.
\end{abstract}
\category{D.1.1}{Applicative (Functional) Programming}{}
\category{D.3.3}{Language Constructs and Features}{Coroutines}
\keywords
Streams, Continuations, Linear Types
Introduction
============
As \citet{hughes_functional_1989} famously noted, the strength of
functional programming languages resides in the composition mechanisms
that they provide. That is, simple components can be built and
understood in isolation; one does not need to worry about interference
effects when composing them. In particular, lazy evaluation affords to
construct complex programs by pipelining simple list transformation
functions. Indeed, while strict evaluation forces to fully reify each
intermediate result between each computational step, lazy
evaluation allows to run all the computations concurrently, often
without ever allocating more than a single intermediate element at a time.
Unfortunately, lazy evaluation suffers from two drawbacks. First, it
has unpredictable memory behavior. Consider the following function
composition:
< f :: [a] -> [b]
< g :: [b] -> [c]
< h = g . f
One hopes that, at run-time, the intermediate list ($[b]$)
will only be allocated element-wise, as outlined above. Unfortunately,
this desired behavior does not always happen. Indeed, a
necessary condition is that the production pattern of $f$ matches the
consumption pattern of $g$; otherwise buffering occurs. In practice,
this means that a seemingly innocuous change in either of the function
definitions may drastically change the memory behavior of the
composition, without warning. If one cares about memory behavior,
this means that the compositionality principle touted by Hughes breaks
down.
Second, lazy evaluation does not extend nicely to effectful
processing. That is, if (say) an input list is produced by reading a
file lazily, one is exposed to losing referential transparency (as
\citet{kiselyov_lazy_2013} has shown). For example, one may rightfully
expect\footnote{This expectation is expressed in a
Stack Overflow question, accessible at this URL:
http://stackoverflow.com/questions/296792/haskell-io-and-closing-files
} that both following programs have the same behavior:
< main = do inFile <- openFile "foo" ReadMode
< contents <- hGetContents inFile
< putStr contents
< hClose inFile
<
< main = do inFile <- openFile "foo" ReadMode
< contents <- hGetContents inFile
< hClose inFile
< putStr contents
Indeed, the \var{putStr} and \var{hClose} commands act on unrelated
resources, and thus swapping them should have no observable effect.
However, while the first program prints the `foo` file, the second one
prints nothing. Indeed, because \var{hGetContents} reads the file
lazily, the \var{hClose} operation has the effect to truncate the
list. In the first program, printing the contents force reading the
file. One may argue that \var{hClose} should not be called in the
first place --- but then, closing the handle happens only when the
\var{contents} list can be garbage collected (in full), and relying on
garbage collection for cleaning resources is brittle; furthermore
this effect compounds badly with the first issue discussed above. If
one wants to use lazy effectful computations, again, the
compositionality principle is lost.
In this paper, we propose to tackle both of these issues by mimicking
the computational behavior of Girard's linear logic
\cite{girard_linear_1987} in Haskell. In fact, one way to read this
paper is as an advocacy for linear types support in Haskell. While
Kiselyov's *iteratees* (\citeyear{kiselyov_iteratees_2012}) already
solves the issues described above, our grounding in linear logic
yields a rich structure for types for data streams, capturing
various production and consumption patterns.
First, the type corresponding to on-demand production of elements is called a
source (\var{Src}). An adaptation of the first code example above to
use sources would look as follows, and give the guarantee that the
composition does not allocate more memory than the sum of its
components.
< f :: Src a -> Src b
< g :: Src b -> Src c
< h = g . f
Second, the type driving the consumption of elements is called a sink
(\var{Snk}). For example, the standard output is naturally given a
sink type:
< stdoutSnk :: Snk String
Using it, we can implement the printing of a file as follows, and
guarantee the timely release of resources, even in the presence of
exceptions:
> main = fileSrc "foo" `fwd` stdoutSnk
In the above \var{fileSrc} provides the contents of a file, and
\var{fwd} forwards data from a source to a sink. The types are as
follows:
< fileSrc :: FilePath -> Src String
< fwd :: Src a -> Snk a -> IO ()
Sources provide data on-demand, while sinks decide when they are ready
to consume data. This is an instance of the push/pull duality. In
general, push-streams control the flow of computation, while
pull-streams respond to it. We will see that this polarization does
not need to match the flow of data. We support in particular data
sources with push-flavor, called co-sources (\var{CoSrc}).
Co-sources are useful for example when a data stream needs precise
control over the execution of effects it embeds (sec
Sec. \ref{async}). For example, sources cannot be demultiplexed, but
co-sources can.
In a program which uses both sources and co-sources, the need might
arise to compose a function which returns a co-source with a function
which takes a source as input: this is the situation where list-based
programs would silently cause memory allocation. In our approach, this
mismatch is caught by the type system, and the user must explicitly
conjure a buffer to be able to write the composition:
< f :: Src a -> CoSrc b
< g :: Src b -> Src c
< h = g . buffer . f
The contributions of this paper are
* The formulation of principles for compositional resource-aware
programming in Haskell (resources include memory and files). The
principles are linearity, duality, and polarization. While borrowed
from linear logic, as far as we know they have not been applied to
Haskell programming before.
* An embodiment of the above principles, in the form of a Haskell
library for streaming `IO`. Besides supporting compositionality as
outlined above, our library features two concrete novel aspects:
1. A more lightweight design than state-of-the-art co-routine based
libraries.
2. Support for explicit buffering and control structures, while
still respecting compositionality (Sec. \ref{async}).
\paragraph{Outline} The rest of the paper is structured as follows.
In Sec. \ref{negations}, we recall the notions of continuations in presence of effects.
In Sec. \ref{streams}, we present our design for streams, and justify it by appealing to linearity principles.
In Sec. \ref{effect-free-streams}, we give an API to program with streams, and analyze their algebraic structure.
In Sec. \ref{effectful-streams}, we show how to embed IO into streams.
In Sec. \ref{async}, we discuss polarity mismatch.
Related work and future work are discussed respectively in sections \ref{related-work} and \ref{future-work}.
We conclude in Sec. \ref{conclusion}.
Preliminary: negation and continuations
=======================================
\label{negations}
In this section we recall the basics of continuation-based
programming. We introduce our notation, and justify effectful
continuations.
We begin by assuming a type of effects \var{Eff}, which we keep
abstract for now. We can then define negation as follows:
> type N a = a -> Eff
A shortcut for double negations is also convenient.
> type NN a = N (N a)
The basic idea (imported from classical logic) pervading this paper
is that producing a result of type α is equivalent to consuming an
argument of type $N α$. Dually, consuming an argument of type α is
equivalent to producing a result of type $N α$. In this paper we call
these equivalences the duality principle.
In classical logic, negation is involutive; that is: $\var{NN}\,α = α$
However, because we work within Haskell, we do not have this
equality\footnote{Even though
\citet{munch-maccagnoni_formulae-as-types_2014} achieves an involutive
negation in an intuitionistic language, he does so by stack
manipulation, which is not available in Haskell.}. We can come close
enough though. First, double negations can always be introduced,
using the \var{shift} operator:
> shift :: a -> NN a
> shift x k = k x
Second, it is possible to remove double negations, but only if an
effect can be outputted. Equivalently, triple negations can be
collapsed to a single one:
> unshift :: N (NN a) -> N a
> unshift k x = k (shift x)
The above two functions are the \var{return} and \var{join} of the
double negation monad\footnote{for \var{join}, substitute $N\,a$ for
$a$}; indeed adding a double negation in the type corresponds to
sending the return value to its consumer. However, we will not be
using this monadic structure anywhere in the following. Indeed, single
negations play a central role in our approach, and the monadic
structure is a mere diversion.
Structure of Effects
--------------------
When dealing with purely functional programs, continuations have no
effects. In this case, one can let \var{Eff} remain abstract, or
define it to be the empty type: $\var{Eff} = \bot$. This is also the
natural choice when interpreting the original linear logic of
\citet{girard_linear_1987}.
The pure logic makes no requirement on effects, but interpretations
may choose to impose a richer structure on them. Such interpretations
would then not be complete with respect to the logic --- but they
would remain sound.
In our case, we first require \var{Eff} to be a monoid. Its unit
(\var{mempty}) corresponds to program termination, while the operator
(\var{mappend}) corresponds to sequential composition of effects.
(This structure is standard to interpret the \smallcaps{halt} and
\smallcaps{mix} rules in linear logic
\citep{bernardy_composable_2015,mellis_resource_2010})
For users of the stream library, \var{Eff} will remain an abstract
monoid. However in this paper we will develop concrete effectful
streams, and therefore we greatly extend the structure of effects. In
fact, because we will provide streams interacting with files and other
operating-system resources, and write the whole code in standard
Haskell, we must pick $\var{Eff} = \var{IO} ()$, and ensure that
\var{Eff} can be treated as a monoid.
> type Eff = IO ()
> instance Monoid Eff where
> mempty = return ()
> mappend = (>>)
The parts of the code which now about $\var{Eff} = \var{IO} ()$ must
be carefully written. The type system provides no particular
guarantees about such code. These IO-interacting functions do not
interpret any standard fragment of linear logic: they are non-standard
extensions of its model.
Streams
=======
Our guiding design principle is duality. This principle is reflected in
the design of the streaming library: we not only have a type for
sources of data but also a type for sinks. For example, a simple
stream processor reading from a single source and writing to a single
sink will be given the following type:
< simple :: Src a -> Snk a -> Eff
We will make sure that \var{Snk} is the negation of a source (and vice
versa), and thus the type of the above program may equivalently have
been written as follows:
< simple :: Src a -> Src a
However, having explicit access to sinks allows us to (for example)
dispatch a single source to multiple sinks, as in the following type signature:
< forkSrc :: Src (a,b) -> Snk a -> Snk b -> Eff
Familiarity with duality will be crucial in the later sections of this paper.
We define sources and sinks by mutual recursion. Producing a
source means to select if some more is available (\var{Cons}) or not
(\var{Nil}). If there is data, one must then produce a data item and
*consume* a sink.
> data Source a = Nil | Cons a (N (Sink a))
> data Sink a = Full | Cont (N (Source a))
Producing a sink means to select if one can accept more elements
(\var{Cont}) or not (\var{Full}). In the former case, one must then be
able to consume a source. The \var{Full} case is useful when the sink
bails out early, for example when it encounters an exception.
Note that, in order to produce (or consume) the next element, the
source (or sink) must handle the effects generated by the other side
of the stream before proceeding. This means that each production is
matched by a consumption, and \textit{vice versa}.
Linearity
---------
For streams to be used safely, one cannot discard nor duplicate them,
for otherwise effects may be discarded and duplicated, which is
dangerous. For example, the same file could be closed twice, or not
at all. Indeed, the last action of a sink will typically be closing
the file. Timely closing of the sink can only be guaranteed if the
actions are run until reaching the end of the pipe (either \var{Full}
or \var{Nil}). In the rest of the section we precisely define the condition
that programs need to respect in order to safely use our streams.
The first notion that we need to define is that of an effectful type:
* The type \var{Eff} is effectful
* A function type is effectful if the co-domain is effectful
* A product type is effectful if any of its operands is effectful
* A sum type is effectful if any of its operands is effectful
* A type variable is not effectful
Further, we say that a variable with an effectful type is itself effectful.
The linearity convention is then respected iff:
1. No effectful variable may be duplicated or shared. In particular,
if passed as an argument to a function it may not be used again.
2. Every effectful variable must be consumed (or passed to a function, which
will be in charged of consuming it).
3. A type variable α can not be instantiated to an effectful type.
In this paper, the linearity convention is enforced by manual
inspection. Manual inspection is unreliable, but weather the linearity
convention is respected can be algorithmically decided. (See
sec. \ref{future-work})
The third restriction (instantiation of type-variables) means that
effectful types cannot be used in standard polymorphic Haskell
functions. This is a severe restriction, but it gives enough leeway to
implement a full-fledged stream library, as we do below. (Yet some
approached to lift this limitation have been proposed, e.g. by
\citet{mazurak_lightweight_2010}.)
One might think that the above restriction fails to take into account
captured environments in functions. Indeed, one can write the following
function, which may be duplicated, but runs linear effects.
> oops :: (() -> Eff) -> IO Bool
> oops k = do ignore <- k ()
> return True
However, writing such a function requires to know that $\var{Eff} =
\var{IO} ()$, and is therefore disallowed by the Haskell type system
in user code, where \var{Eff} is kept abstract. (The \var{mappend}
function may combine two effects in one, but not discard or duplicate
them.)
Basics
------
We begin by presenting three basic function to manipulate
\var{Source} and \var{Sink}: one to read from sources, one to write
to sinks, and one to connect sources and sinks.
\paragraph{Reading}
One may want to provide the following function, waiting for data to be
produced by a source. The second argument is the effect to run if no
data is produced, and the third is the effect to run given the data
and the remaining source.
> await :: Source a -> Eff -> (a -> Source a -> Eff) -> Eff
> await Nil eof _ = eof
> await (Cons x cs) _ k = cs $ Cont $ \xs -> k x xs
However, the above function breaks the linearity invariant, so we will
refrain to use it as such. The pattern that it defines is still
useful: it is valid when the second and third argument consume the
same set of variables. Indeed, this condition is often satisfied.
\paragraph{Writing}
One can send data to a sink. If the sink is full, the data is ignored.
The third argument is a continuation getting the "new" sink, that
obtained after the "old" sink has consumed the data.
> yield :: a -> Sink a -> (Sink a -> Eff) -> Eff
> yield x (Cont c) k = c (Cons x k)
> yield _ Full k = k Full
\paragraph{Forwarding}
One can forward the data from a source to a sink, as follows. The
effect generated by this operation is the combined effect of all
productions and consumptions on the stream.
> forward :: Source a -> Sink a -> Eff
> forward s (Cont s') = s' s
> forward Nil Full = mempty
> forward (Cons _ xs) Full = xs Full
Baking in negations: exercise in duality
-------------------
Programming with \var{Source} and \var{Sink} explicitly is
inherently continuation-heavy: negations must be explicitly added in
many places. This style is somewhat inconvenient; therefore, we will
use instead pre-negated versions of sources and sink:
> type Src a = N (Sink a)
> type Snk a = N (Source a)
These definitions have the added advantage to perfect the duality
between sources and sinks, while not restricting the programs one can
write.
Indeed, one can access the underlying structure as follows:
> onSource :: (Src a -> t) -> Source a -> t
> onSink :: (Snk a -> t) -> Sink a -> t
> onSource f s = f (\t -> forward s t)
> onSink f t = f (\s -> forward s t)
And, while a negated \var{Sink} cannot be converted to a
\var{Source}, all the following conversions are implementable:
> unshiftSnk :: N (Src a) -> Snk a
> unshiftSrc :: N (Snk a) -> Src a
> shiftSnk :: Snk a -> N (Src a)
> shiftSrc :: Src a -> N (Snk a)
> unshiftSnk = onSource
> unshiftSrc = onSink
> shiftSnk k kk = kk (Cont k)
> shiftSrc k kk = k (Cont kk)
A different reading of the type of \var{shiftSrc} reveals that it implements
forwarding of data from \var{Src} to \var{Snk}:
> fwd :: Src a -> Snk a -> Eff
> fwd = shiftSrc
In particular, one can flip sink transformers to obtain source transformers,
and vice versa.
> flipSnk :: (Snk a -> Snk b) -> Src b -> Src a
> flipSnk f s = shiftSrc s . onSink f
> flipSrc :: (Src a -> Src b) -> Snk b -> Snk a
> flipSrc f t = shiftSnk t . onSource f
Flipping allows to choose the most convenient direction to
implement, and get the other one for free. Consider as an example the
implementation of the mapping functions:
> mapSrc :: (a -> b) -> Src a -> Src b
> mapSnk :: (b -> a) -> Snk a -> Snk b
Mapping sources is defined by flipping mapping of sinks:
> mapSrc f = flipSnk (mapSnk f)
Sink mapping is defined by case analysis on the concrete
source, and the recursive case conveniently calls \var{mapSrc}.
> mapSnk _ snk Nil = snk Nil
> mapSnk f snk (Cons a s)
> = snk (Cons (f a) (mapSrc f s))
When using double negations, it is sometimes useful to insert or
remove them inside type constructor. For sources and sinks, one proceeds
as follows. Introduction of double negation in sources and its elimination
in sinks is a special case of mapping.
> nnIntro :: Src a -> Src (NN a)
> nnIntro = mapSrc shift
> nnElim' :: Snk (NN a) -> Snk a
> nnElim' = mapSnk shift
The duals are easily implemented by case analysis, following the mutual
recursion pattern introduced above.
> nnElim :: Src (NN a) -> Src a
> nnIntro' :: Snk a -> Snk (NN a)
> nnElim = flipSnk nnIntro'
> nnIntro' k Nil = k Nil
> nnIntro' k (Cons x xs) = x $ \x' -> k (Cons x' $ nnElim xs)
Effect-Free Streams
===================
The functions seen so far make no use of the fact that \var{Eff} can
embed IO actions. In fact, a large number of useful functions over
streams can be implemented without relying on IO. We give an overview
of effect-free streams in this section.
List-Like API
-------------
To begin, we show that one can implement a list-like API for
sources, as follows:
> empty :: Src a
> empty sink' = forward Nil sink'
> cons :: a -> Src a -> Src a
> cons a s s' = yield a s' s
> tail :: Src a -> Src a
> tail = flipSnk $ \t s -> case s of
> Nil -> t Nil
> Cons _ xs -> fwd xs t
(Taking just the head is not meaningful due to the linearity
constraint)
Dually, the full sink is simply
> plug :: Snk a
> plug source' = forward source' Full
Another useful function is the equivalent of \var{take} on lists.
Given a source, we can create a new source which ignores all but its
first $n$ elements. Conversely, we can prune a sink to consume only
the first $n$ elements of a source.
> takeSrc :: Int -> Src a -> Src a
> takeSnk :: Int -> Snk a -> Snk a
The natural implementation is again by mutual recursion. The main
subtlety is that, when reaching the $n$th element, both ends of the
stream must be notified of its closing. Note the use of the monoidal
structure of \var{Eff} in this case.
> takeSrc i = flipSnk (takeSnk i)
> takeSnk _ s Nil = s Nil
> takeSnk 0 s (Cons _ s') = s Nil <> s' Full
> takeSnk i s (Cons a s') = s (Cons a (takeSrc (i-1) s'))
Algebraic structure
-------------------
Source and sinks form a monoid under concatenation:
> instance Monoid (Src a) where
> mappend = appendSrc
> mempty = empty
> instance Monoid (Snk a) where
> mappend = appendSnk
> mempty = plug
We have already encountered the units (\var{empty} and \var{plug});
the appending operations are defined below. Intuitively,
\var{appendSrc} first gives control to the first source until it runs
out of elements and then turns control over to the second source. This
behavior is implemented in the helper function \var{forwardThenSnk}.
> appendSrc :: Src a -> Src a -> Src a
> appendSrc s1 s2 Full = s1 Full <> s2 Full
> appendSrc s1 s2 (Cont s)
> = s1 (Cont (forwardThenSnk s s2))
> forwardThenSnk :: Snk a -> Src a -> Snk a
> forwardThenSnk snk src Nil = fwd src snk
> forwardThenSnk snk src (Cons a s)
> = snk (Cons a (appendSrc s src))
Sinks can be appended is a similar fashion.
> appendSnk :: Snk a -> Snk a -> Snk a
> appendSnk s1 s2 Nil = s1 Nil <> s2 Nil
> appendSnk s1 s2 (Cons a s)
> = s1 (Cons a (forwardThenSrc s2 s))
> forwardThenSrc :: Snk a -> Src a -> Src a
> forwardThenSrc s2 = flipSnk (appendSnk s2)
The operations \var{forwardThenSnk} and \var{forwardThenSrc} are akin
to making the difference of sources and sinks, thus we find it
convenient to give them the following aliases:
> (-?) :: Snk a -> Src a -> Snk a
> t -? s = forwardThenSnk t s
> (-!) :: Snk a -> Src a -> Src a
> t -! s = forwardThenSrc t s
> infixr -!
> infixl -?
Appending and differences interact in the expected way: the following
observational equalities hold:
< t -? (s1 <> s2) == t -? s2 -? s1
< (t1 <> t2) -! s == t1 -! t2 -! s
<!--
Not sure if these are true or what
> prop_diff3 t1 t2 s = (t1 <> t2) -? s == t1 -? (t2 -! s)
> prop_diff4 t s1 s2 = t -! (s1 <> s2) == (t -? s1) -! s2
-->
The proofs for the above laws can be found in Appendix \ref{proof}.
\paragraph{Functor}
We have already seen the mapping functions for sources and sinks:
sources are functors and sinks are contravariant functors. (Given the
implementation of the morphism actions it is straightforward to check
the functor laws.)
Table of effect-free functions
------------------------------
The above gives already an extensive API for sources and sinks, many
more useful effect-free functions can be implemented on this basis. We
give here a menu of functions that we have implemented, and whose
implementation is available in the appendix.
Zip two sources, and the dual.
> zipSrc :: Src a -> Src b -> Src (a,b)
> forkSnk :: Snk (a,b) -> Src a -> Snk b
<!--
or: forkSnk :: Snk (a,b) -> Snk a ⅋ Snk b
-->
Zip two sinks, and the dual.
> forkSrc :: Src (a,b) -> Snk a -> Src b
> zipSnk :: Snk a -> Snk b -> Snk (a,b)
Equivalent of \var{scanl'} for sources, and the dual
> scanSrc :: (b -> a -> b) -> b -> Src a -> Src b
> scanSnk :: (b -> a -> b) -> b -> Snk b -> Snk a
Equivalent of \var{foldl'} for sources, and the dual.
> foldSrc' :: (b -> a -> b) -> b -> Src a -> NN b
> foldSnk' :: (b -> a -> b) -> b -> N b -> Snk a
Drop some elements from a source, and the dual.
> dropSrc :: Int -> Src a -> Src a
> dropSnk :: Int -> Snk a -> Snk a
Convert a list to a source, and vice versa.
> fromList :: [a] -> Src a
> toList :: Src a -> NN [a]
Split a source in lines, and the dual.
> linesSrc :: Src Char -> Src String
> unlinesSnk :: Snk String -> Snk Char
Consume elements until the predicate is reached; then the sink is
closed.
> untilSnk :: (a -> Bool) -> Snk a
Interleave two sources, and the dual.
> interleave :: Src a -> Src a -> Src a
> interleaveSnk :: Snk a -> Src a -> Snk a
Forward data coming from the input source to the result source and to
the second argument sink.
> tee :: Src a -> Snk a -> Src a
Filter a source, and the dual.
> filterSrc :: (a -> Bool) -> Src a -> Src a
> filterSnk :: (a -> Bool) -> Snk a -> Snk a
Turn a source of chunks of data into a single source; and the dual.
> unchunk :: Src [a] -> Src a
> chunkSnk :: Snk a -> Snk [a]
App: Stream-Based Parsing
-------------------------
To finish with effect-free function, we give an example of a complex
stream processor, which turns source of unstructured data into a
source of structured data, given a parser. This conversion is useful
for example to turn an XML file, provided as a stream of characters
into a stream of (opening and closing) tags.
We begin by defining a pure parsing structure, modeled after the
parallel parsing processes of \citet{claessen_parallel_2004}. The
parser is continuation based, but the effects being accumulated are
parsing processes, defined as follows. The \var{Sym} constructor parses \var{Just}
a symbol, or \var{Nothing} if the end of stream is reached. A process may
also \var{Fail} or return a \var{Result}.
> data P s res = Sym (Maybe s -> P s res)
> | Fail
> | Result res
A parser is producing the double negation of $a$:
> newtype Parser s a = P (forall res. (a -> P s res) -> P s res)
The monadic interface can then be built in the standard way:
> instance Monad (Parser s) where
> return x = P $ \fut -> fut x
> P f >>= k = P (\fut -> f (\a -> let P g = k a in g fut))
> instance Applicative (Parser s) where
> pure = return
> (<*>) = ap
> instance Functor (Parser s) where
> fmap = (<$>)
The essential parsing ingredient, choice, rests on the
ability to weave processes together; picking that which
succeeds first, and that which fails as last resort:
> weave :: P s a -> P s a -> P s a
> weave Fail x = x
> weave x Fail = x
> weave (Result res) y = Result res
> weave x (Result res) = Result res
> weave (Sym k1) (Sym k2)
> = Sym (\s -> weave (k1 s) (k2 s))
> (<|>) :: Parser s a -> Parser s a -> Parser s a
> P p <|> P q = P (\fut -> weave (p fut) (q fut))
Parsing then reconciles the execution of the process with the
traversal of the source. In particular, whenever a result is
encountered, it is fed to the sink. If the parser fails, both ends of
the stream are closed.
> parse :: forall s a. Parser s a -> Src s -> Src a
> parse q@(P p0) = flipSnk $ scan $ p0 $ \x -> Result x
> where
> scan :: P s a -> Snk a -> Snk s
> scan (Result res ) ret xs = ret
> (Cons res $ parse q $ forward xs)
> scan Fail ret xs = ret Nil <> forward xs Full
> scan (Sym f) mres xs = case xs of
> Nil -> scan (f Nothing) mres Nil
> Cons x cs -> fwd cs (scan (f $ Just x) mres)
Effectful streams
=================
So far, we have constructed only effect-free streams. That is, effects
could be any monoid, and in particular the unit type. In this
section we bridge this gap and provide some useful sources and sinks
performing IO effects, namely reading and writing to files.
We first define the following helper function, which sends data to a
handle, thereby constructing a sink.
> hFileSnk :: Handle -> Snk String
> hFileSnk h Nil = hClose h
> hFileSnk h (Cons c s) = do
> hPutStrLn h c
> s (Cont (hFileSnk h))
A file sink is then simply:
> fileSnk :: FilePath -> Snk String
> fileSnk file s = do
> h <- openFile file WriteMode
> hFileSnk h s
And the sink for standard output is:
> stdoutSnk :: Snk String
> stdoutSnk = hFileSnk stdout
(For ease of experimenting with our functions, the data items are
lines of text --- but an industrial variant would provide chunks of
raw binary data, to be further parsed.)
Conversely, a file source reads data from a file, as follows:
> hFileSrc :: Handle -> Src String
> hFileSrc h Full = hClose h
> hFileSrc h (Cont c) = do
> e <- hIsEOF h
> if e then do hClose h
> c Nil
> else do x <- hGetLine h
> c (Cons x $ hFileSrc h)
> fileSrc :: FilePath -> Src String
> fileSrc file sink = do
> h <- openFile file ReadMode
> hFileSrc h sink
Combining the above primitives, we can then implement file copy as
follows:
> copyFile :: FilePath -> FilePath -> Eff
> copyFile source target = fwd (fileSrc source)
> (fileSnk target)
It should be emphasized at this point that when running \var{copyFile} reading and writing will be
interleaved: in order to produce the next line in the source (in this
case by reading from the file), the current line must first be
consumed in the sink (in this case by writing it to disk). The stream
behaves fully synchronously, and no intermediate data is buffered.
Whenever a sink is full, the source connected to it should be finalized.
The next example shows what happens when a sink closes the stream
early. Instead of connecting the source to a bottomless sink, we
connect it to one which stops receiving input after three lines.
> read3Lines :: Eff
> read3Lines = fwd (hFileSrc stdin)
> (takeSnk 3 $ fileSnk "text.txt")
Indeed, testing the above program reveals that it properly closes
\var{stdin} after reading three lines. This early closing of sinks
allows modular stream programming. In particular, it is easy to
support proper finalization in the presence of exceptions, as the next
section shows.
Exception Handling
------------------
While the above implementations of file source and sink are fine for
illustrative purposes, their production-strength versions should
handle exceptions. Doing so is straightforward: as shown above, our
sinks and sources readily support early closing of the stream.
The following code fragment shows how to handle an exception when
reading a line in a file source.
> hFileSrcSafe :: Handle -> Src String
> hFileSrcSafe h Full = hClose h
> hFileSrcSafe h (Cont c) = do
> e <- hIsEOF h
> if e then do
> hClose h
> c Nil
> else do
> mx <- catch (Just <$> hGetLine h)
> (\(_ :: IOException) -> return Nothing)
> case mx of
> Nothing -> c Nil
> Just x -> c (Cons x $ hFileSrcSafe h)
Exceptions raised in \var{hIsEOF} should be handled in the same
way. The file sink is responsible for handling its own exceptions so
there is no need to insert a handler around the invocation of the
continuation \var{c}. One would probably have a field in both the
\var{Nil} and \var{Full} constructors indicating the nature of the
exception encountered, if any, but we will not bother in the proof of
concept implementation presented in this paper.
Dealing with exceptions is done once and for all when implementing the
library of streams. The programmer using the library does not have to
be concerned with exceptions as they are caught and communicated
properly under the hood.
Using exception handlers, as in the above snippet, will secure the
library from synchronous exceptions arising from accessing the file,
but not from asynchronous exceptions which may come from other
sources. Asynchronous exception-safety requires more machinery. The
region library presented in \citet{kiselyov2008lightweight} can be
used for this purpose, as outlined in
\citet{kiselyov12:region_iteratees}.
Synchronicity and Asynchronicity
================================
\label{async}
One of the main benefits of streams as defined here is that the
programming interface is (or appears to be) asynchronous, while the
run-time behavior is synchronous.
That is, one can build a data source regardless of how the data is be consumed,
or dually one can build a sink regardless of how the data is produced;
but, despite the independence of definitions, all the code can (and
is) executed synchronously: composing a source and a sink require no
concurrency (nor any external control structure).
As discussed above, a consequence of synchronicity is that the
programmer cannot be implicitly buffering data when connecting a source
to a sink: every production must be matched by a consumption (and vice
versa). In sum, synchronicity restricts the kind of operations one
can construct, in exchange for two guarantees:
1. Execution of connected sources and sinks is synchronous
2. No implicit memory allocation happens
While the guarantees have been discussed so far, it may be unclear how
synchronicity actually restricts the programs one can write. In the
rest of the section we show by example how the restriction plays out.
Example: demultiplexing
-----------------------
One operation supported by synchronous behavior is the demultiplexing
of a source, by connecting it to two sinks.
> dmux' :: Src (Either a b) -> Snk a -> Snk b -> Eff
We can implement this demultiplexing operation as follows:
> dmux :: Source (Either a b) -> Sink a -> Sink b -> Eff
> dmux Nil ta tb = forward Nil ta <> forward Nil tb
> dmux (Cons ab c) ta tb = case ab of
> Left a -> c $ Cont $ \src' -> case ta of
> Full -> forward Nil tb <> plug src'
> Cont k -> k (Cons a $ \ta' -> dmux src' ta' tb)
> Right b -> c $ Cont $ \src' -> case tb of
> Full -> forward Nil ta <> plug src'
> Cont k -> k (Cons b $ \tb' -> dmux src' ta tb')
> dmux' sab' ta' tb' =
> shiftSnk ta' $ \ta ->
> shiftSnk tb' $ \tb ->
> shiftSrc sab' $ \sab ->
> dmux sab ta tb
The key ingredient is that demultiplexing starts by reading the next
value available on the source. Depending on its value, we feed the
data to either of the sinks are proceed. Besides, as soon as any of
the three parties closes the stream, the other two are notified.
However, multiplexing sources cannot be implemented while respecting
synchronicity. To see why, let us attempt anyway, using the following
type signature:
< mux :: Src a -> Src b -> Src (Either a b)
< mux sa sb = ?
We can try to fill the hole by reading on a source. However, if we do
this, the choice falls to the multiplexer to choose which source to
run first. We may pick \var{sa}, however it may be blocking, while
\var{sb} is ready with data. This is not really multiplexing, at best
this approach would give us interleaving of data sources, by taking
turns.
In order to make any progress, we can let the choice of which source
to pick fall on the consumer of the stream. The type that we need for
output data in this case is a so-called additive conjunction. It is
the dual of the \var{Either} type: there is a choice, but this choice
falls on the consumer rather than the producer of the data. Additive
conjunction, written &, can be encoded by sandwiching \var{Either}
between two inversion of the control flow, thus switching the party
who makes the choice:
> type a & b = N (Either (N a) (N b))
(One will recognize the similarity between this definition and the
De Morgan's laws.)
We can then amend the type of multiplexing:
> mux :: Src a -> Src b -> Src (a & b)
Unfortunately, we still cannot implement multiplexing typed as
above. Consider the following attempt, where we begin by asking the
consumer if it desires $a$ or $b$. If the answer is $a$,
we can extract a value from \var{sa} and yield it; and
symmetrically for $b$.
> mux sa sb (Cont tab) = tab $ Cons
> (\ab -> case ab of
> Left ka -> sa $ Cont $ \(Cons a resta) -> ka a
> Right kb -> sb $ Cont $ \(Cons b restb) -> kb b)
> (error "oops")
However, there is no way to then make a recursive call (`oops`) to
continue processing. Indeed the recursive call to make must depend on
the choice made by the consumer (in one case we should be using
\var{resta}, in the other \var{restb}). However the type of \var{Cons}
forces us to produce its arguments independently.
What we need to do is to reverse the control fully: we need a data
source which is in control of the flow of execution.
Co-Sources, Co-Sinks
-------------------
We call the structure that we are looking for a
*co-source*. Co-sources are the subject of this section. Remembering
that producing $N a$ is equivalent to consuming $a$, thus a sink of $N
a$ is a (different kind of) source of $a$. We define:
> type CoSrc a = Snk (N a)
> type CoSnk a = Src (N a)
Implementing multiplexing on co-sources is then straightforward, by
leveraging \var{dmux'}:
> mux' :: CoSrc a -> CoSrc b -> CoSrc (a & b)
> mux' sa sb = unshiftSnk $ \tab -> dmux' (nnElim tab) sa sb
We use the rest of the section to study the property of co-sources and
co-sinks.
\var{CoSrc} is a functor, and \var{CoSnk} is a contravariant functor.
> mapCoSrc :: (a -> b) -> CoSrc a -> CoSrc b
> mapCoSrc f = mapSnk (\b' -> \a -> b' (f a))
> mapCoSnk :: (b -> a) -> CoSnk a -> CoSnk b
> mapCoSnk f = mapSrc (\b' -> \a -> b' (f a))
Elements of a co-source are access only "one at a time". That is, one
cannot extract the contents of a co-source as a list. Attempting to
implement this extraction looks as follows.
> coToList :: CoSrc a -> NN [a]
> coToList k1 k2 = k1 $ Cons (\a -> k2 [a]) (error "rest")
> coToList k1 k2 = k2 $ (error "a?") : (error "rest")
If one tries to begin by eliminating the co-source (first equation), then there is no
way to produce subsequent elements of the list. If one tries to begin
by constructing the list (second equation), then no data is available.
Yet it is possible to define useful and effectful co-sources and
co-sinks. The first example shows how to provide a file as a co-source:
> coFileSrc :: Handle -> CoSrc String
> coFileSrc h Nil = hClose h
> coFileSrc h (Cons x xs) = do
> e <- hIsEOF h
> if e then do
> hClose h
> xs Full
> else do
> x' <- hGetLine h
> x x' -- (1)
> xs $ Cont $ coFileSrc h -- (2)
Compared to \var{fileSrc}, the difference is that this function can
decide the ordering of effects ran in a co-sink connected to it. That is,
the lines (1) and (2) have no data dependency. Therefore they may be
run in any order. (Blindly doing so is a bad idea though, as the
\var{Full} action on the sink will be run before all other actions.)
We will see in the next section how this situation generalizes.
The second example is a infinite co-sink that sends data to a file.
> coFileSink :: Handle -> CoSnk String
> coFileSink h Full = hClose h
> coFileSink h (Cont c) = c (Cons (hPutStrLn h)
> (coFileSink h))
Compared to \var{fileSnk}, the difference is that one does not control
the order of execution of effects. The effect of writing the current
line is put in a data structure, and its execution is up to the
co-source which eventually connects to the co-sink. Thus, the
order of writing lines in the file depends on the order of effects chosen
in the co-source connected to this co-sink.
In sum, using co-sources and co-sinks shifts the flow of control from
the sink to the source. It should be stressed that, in the programs
which use the functions defined so far (even those that use IO),
synchronicity is preserved: no data is buffered implicitly, and
reading and writing are interleaved.
Asynchronicity
--------------
We have seen so far that synchronicity gives useful guarantees, but
restricts the kind of programs one can write. In this section, we
provide primitives which allow forms of asynchronous programming within
our framework.
The main benefit of sticking to our framework in this case is that
asynchronous behavior is cornered to the explicit usages of these
primitives. That is, the benefits of synchronous programming still
hold locally.
\paragraph{Scheduling}
When converting a \var{Src} to a \var{CoSrc} (or dually \var{CoSnk} to
a \var{Snk}), we have two streams which are ready to respond to
pulling of data from them. This means that effects must be scheduled
explicitly, as we have seen an example above when manually converting
the file source to a file co-source.
In general, given a \var{Schedule}, we can implement the above two
conversions:
> srcToCoSrc :: Schedule a -> Src a -> CoSrc a
> coSnkToSnk :: Schedule a -> CoSnk a -> Snk a
We define a \var{Schedule} as the reconciliation between a source and a
co-sink:
> type Schedule a = Source a -> Source (N a) -> Eff
Implementing the conversions is then straightforward:
> srcToCoSrc strat s s0 = shiftSrc s $ \ s1 -> strat s1 s0
> coSnkToSnk strat s s0 = shiftSrc s $ \ s1 -> strat s0 s1
What are possible scheduling strategies? The simplest, and most
natural one is sequential execution: looping through both sources and
match the consumptions/productions element-wise, as follows.
> sequentially :: Schedule a
> sequentially Nil (Cons _ xs) = xs Full
> sequentially (Cons _ xs) Nil = xs Full
> sequentially (Cons x xs) (Cons x' xs') =
> x' x <> (shiftSrc xs $ \sa ->
> shiftSrc xs' $ \sna ->
> sequentially sa sna)
When effects are arbitrary IO actions, sequential execution is the
only sensible schedule: indeed, the sources and sinks expect their
effects to be run in the order prescribed by the stream. Swapping the
arguments to `<>` in the above means that \var{Full} effects will be
run first, spelling disaster.
However, in certain cases running effects out of order may make
sense. If the monoid of effects is commutative (or if the programmer
is confident that execution order does not matter), one can shuffle
the order of execution of effects. This re-ordering can be taken
advantage of to run effects concurrently, as follows:
> concurrently :: Schedule a
> concurrently Nil (Cons _ xs) = xs Full
> concurrently (Cons _ xs) Nil = xs Full
> concurrently (Cons x xs) (Cons x' xs') = do
> forkIO $ x' x
> (shiftSrc xs $ \sa ->
> shiftSrc xs' $ \sna ->
> concurrently sa sna)
The above strategy is useful if the production or consumption
of elements is expensive and distributable over computation units.
While the above implementation naively spawns a thread for every
element, in reality one will most likely want to divide the stream
into chunks before spawning threads. Because strategies are separate
components, a bad choice is easily remedied to by swapping one
strategy for another.
\paragraph{Buffering}
Consider now the situation where one needs to convert from a
\var{CoSrc} to a \var{Src} (or from a \var{Snk} to a \var{CoSnk}).
Here, we have two streams, both of which want to control the execution
flow. The conversion can only be implemented by running both streams
in concurrent threads, and have them communicate via some form of
buffer. A form of buffer that we have seen before is the file. Using
it yields the following buffering implementation:
> fileBuffer :: String -> CoSrc String -> Src String
> fileBuffer tmpFile f g = do
> h' <- openFile tmpFile WriteMode
> forkIO $ fwd (coFileSink h') f
> h <- openFile tmpFile ReadMode
> hFileSrc h g
If the temporary file is a regular file, the above implementation is
likely to fail. For example the reader may be faster than the writer
and reach an end of file prematurely. Thus the temporary file should
be a UNIX pipe. One then faces the issue that UNIX pipes are of fixed
maximum size, and if the writer overshoots the capacity of the pipe, a
deadlock will occur.
Thus, one may prefer to use Concurrent Haskell channels as a buffering
means, as they are bounded only by the size of the memory and do not
rely on any special feature of the operating system:
> chanCoSnk :: Chan a -> CoSnk a
> chanCoSnk _ Full = return ()
> chanCoSnk h (Cont c) = c (Cons (writeChan h)
> (chanCoSnk h))
> chanSrc :: Chan a -> Src a
> chanSrc _ Full = return ()
> chanSrc h (Cont c) = do x <- readChan h
> c (Cons x $ chanSrc h)
> chanBuffer :: CoSrc a -> Src a
> chanBuffer f g = do
> c <- newChan
> forkIO $ fwd (chanCoSnk c) f
> chanSrc c g
Note that it is easy to create a bounded buffer, by guarding the
writes with a semaphore. In general there is no issue with blocking
reads or writes. The implementation follows.
> chanCoSnk' :: Chan a -> QSem -> CoSnk a
> chanCoSnk' _ _ Full = return ()
> chanCoSnk' h s (Cont c) = c (Cons write
> (chanCoSnk' h s))
> where write x = do waitQSem s
> writeChan h x
> chanSrc' :: Chan a -> QSem -> Src a
> chanSrc' _ _ Full = return ()
> chanSrc' h s (Cont c) = do x <- readChan h
> signalQSem s
> c (Cons x $ chanSrc' h s)
> boundedChanBuffer :: Int -> CoSrc a -> Src a
> boundedChanBuffer n f g = do
> c <- newChan
> s <- newQSem n
> forkIO $ fwd (chanCoSnk' c s) f
> chanSrc' c s g
In certain situations (for example for a stream yielding a status
whose history does not matter, like mouse positions) one may want to
ignore all but the latest datum. In this case a single memory cell can
serve as buffer:
> varCoSnk :: IORef a -> CoSnk a
> varCoSnk _ Full = return ()
> varCoSnk h (Cont c) = c (Cons (writeIORef h)
> (varCoSnk h))
> varSrc :: IORef a -> Src a
> varSrc _ Full = return ()
> varSrc h (Cont c) = do x <- readIORef h
> c (Cons x $ varSrc h)
> varBuffer :: a -> CoSrc a -> Src a
> varBuffer a f g = do
> c <- newIORef a
> forkIO $ fwd (varCoSnk c) f
> varSrc c g
All the above buffering operations work on sources, but they can be generically
inverted to work on sinks, as follows.
> flipBuffer :: (forall a. CoSrc a -> Src a) -> Snk b -> CoSnk b
> flipBuffer f s = f (nnIntro' s)
Summary
-------
In sum, we can classify streams according to polarity:
- Pull: source and co-sinks
- Push: sinks and co-sources
We then have three situations when composing stream processors:
1. Matching polarities (one pull, one push). In this case behavior is
synchronous; no concurrency appears.
2. Two pull streams. In this case an explicit loop must process the
streams. If effects commute, the programmer may run effects out of
order, potentially concurrently.
3. Two push streams. In this case the streams must run in independent
threads, and the programmer needs to make a choice for the communication
buffer. One needs to be careful: if the buffer is to small a deadlock
may occur.
Therefore, when programming with streams, one should consume push
types when one can, and pull ones when one must. Conversely, one
should produce pull types when one can, and push ones when one
must.
App: idealized echo server
---------------------
We finish exposition of asynchronous behavior with a small program
sketching the skeleton of a client-server application. This is a small
server with two clients, which echoes the requests of each client to
both of them.
The server communicates with each client via two streams, one for
inbound messages, one for outbound ones. We want each client to be
able to send and receive messages in the order that they like. That
is, from their point of view, they are in control of the message
processing order. Hence a client should have a co-sink for sending
messages to the server, and a source for receiving them. On the
server side, types are dualized and thus, a client is represented by a
pair of a co-source and a sink:
> type Client a = (CoSrc a, Snk a)
For simplicity we implement a chat server handling exactly two
clients.
The first problem is to multiplex the inputs of the clients. In the
server, we do not actually want any client to be controlling the
processing order. Hence we have to multiplex the messages in real time,
using a channel (note the similarity with \var{chanBuffer}):
> bufferedDmux :: CoSrc a -> CoSrc a -> Src a
> bufferedDmux s1 s2 t = do
> c <- newChan
> forkIO $ fwd (chanCoSnk c) s1
> forkIO $ fwd (chanCoSnk c) s2
> chanSrc c t
We then have to send each message to both clients. This may be done
using the following effect-free function, which forwards everything
sent to a sink to its two argument sinks.
> collapseSnk :: Snk a -> Snk a -> Snk a
> collapseSnk t1 t2 Nil = t1 Nil <> t2 Nil
> collapseSnk t1 t2 (Cons x xs)
> = t1 (Cons x $ \c1 ->
> t2 (Cons x $ \c2 ->
> shiftSrc xs (collapseSnk (flip forward c1)
> (flip forward c2))))
The server can then be defined by composing the above two functions.
> server :: Client a -> Client a -> Eff
> server (i1,o1) (i2,o2) = fwd (bufferedDmux i1 i2)
> (collapseSnk o1 o2)
Related Work
============
Polarities, data structures and control
---------------------------------------
One of keys ideas formalized in this paper is to classify streams by
polarity. The push polarity (Sinks, CoSrc) controls the execution
thread, whereas the pull one (Sources, Co-sinks) provide
data. This idea has recently been taken advantage of to
bring efficient array programming facilities to functional programming
\citep{bernardy_composable_2015,claessen2012expressive,ankner_edsl_2013}.
This concept is central in the literature on Girard's linear logic
\citep{laurent_etude_2002,zeilberger_logical_2009}. However, in the
case of streams, this idea dates back at least to
\citet{jackson_principles_1975} (\citet{kay_you_2008} gives a good
summary of Jacksons' insight).
Our contribution is to bring this idea to stream programming in
Haskell. (While duality was used for Haskell array programming, it has
not been taken advantage for stream programming.) We believe that our
implementation brings together the practical applications that Jackson
intended, while being faithful to the theoretical foundations in
logic, via the double-negation embedding.
Iteratees
---------
We consider that the state of the art in Haskell stream processing is
embodied by Kiselyov's iteratees \citeyear{kiselyov_iteratees_2012}.
The type for iteratees can be given the following definitions:
> data I s m a = Done a | GetC (Maybe s -> m (I s m a))
An iteratee $I\,s\,m\,a$ roughly corresponds to a sink of $s$ which also
returns an $a$ --- but it uses a monad $m$ rather than a monoid
\var{Eff} for effects.
The above type contains a continuation in the \var{GetC}
constructor. Therefore, one must be careful about discarding or
duplicating iteratees. Hence, such libraries typically provide
higher-level interfaces to discourage non-linear usages.
A first advantage of our approach is the formulation and emphasis on
the linearity constraint, which is central to correct use of effectful
continuations. It appears that variants of iteratees (including the
*pipes* library) make the representation abstract, but at the cost of
a complex interface for programming them. By stating the linearity
requirement no complex abstract API is necessary to guarantee safety.
A second advantage of our library is that effects are not required to
be monads. Indeed, the use of continuations already provide the
necessary structure to combine computations (recall in particular
that double negation is already a monad). We believe that having a
single way to bind intermediate results (continuations vs. both
continuations and monads) is a simplification in design, which may make
our library more approachable.
The presence of source and sinks also clarifies how to build complex
types programs from basic blocks. Indeed, iteratee-based libraries
make heavy use of the following types:
> type Enumerator el m a = I el m a -> m (I el m a)
> type Enumeratee elo eli m a =
> I eli m a -> I elo m (I eli m a)
It is our understanding that these types make up for the lack of explicit
sources by putting iteratees (sinks) on the left-hand-side of an
arrow. Enumerators are advantageously replaced by sources, and
enumeratees by simple functions from source to source (or sink to
sink).
A third advantage of our approach is that the need for buffering (or
the scheduling opportunities) are clearly indicated by the type
system, as mismatching polarities.
In more recent work \citet{kiselyov_lazy_2012} present a
continuation-based pretty printer, which fosters a more stylized used
of continuations, closer to what we advocate here. Producers and
consumers (sources and sinks) are defined more simply, using types
which correspond more directly to negations:
< type GenT e m = ReaderT (e -> m ()) m
< type Producer m e = GenT e m ()
< type Consumer m e = e -> m ()
< type Transducer m1 m2 e1 e2 =
< Producer m1 e1 -> Producer m2 e2
Yet, in that work, linearity is only briefly mentioned; the use of a
monad rather than monoid persists; and mismatching polarities are not
discussed, let alone taken advantage of.
Several production-strength libraries have been built upon the concept
of iteratees, including *pipes* \citep{gonzalez_pipes_2015},
*conduits* \citep{snoyman_conduit_2015} and *machines*
\citep{kmett_machines_2015}. While we focus our comparison with
iteratees, most of our analysis carries to the production libraries.
There is additionally a large body of non peer-reviewed literature
discussing and analyzing either iteratees or its variants. The
proliferation of libraries for IO streaming in Haskell indicates that
a unifying foundation for them is needed, and we hope that the present
paper provides a basis for such a foundation.
Feldspar monadic streams
------------------------
Feldspar, a DSL for digital signal processing, has a notion of streams
built on monads \citep{axelsson_feldspar_2010,svenningsson15:monadic_streams}. In Haskell
the stream type can be written as follows:
< type Stream a = IO (IO a)
Intuitively the outer monad can be understood as performing
initialization which creates the inner monadic computation. The inner
computation is called iteratively to produce the elements of the
stream.
Compared to the representation in the present paper, the monadic
streams only has one form of stream, corresponding to a source. Also,
there is no support for timely release of resources, such things need
to be dealt with outside of the stream framework. Additionally, even
conceptually effect-free streams rely on running IO effects.
Session Types
-------------
In essence our pair of types for stream is an encoding of a protocol
for data transmission. This protocol is readily expressible using
linear types, following the ideas of \citet{wadler_propositions_2012}
and \citet{caires_concurrent_2012}:
< Source a = 1 ⊕ (a ⊗ N (Sink a))
< Sink a = 1 ⊕ N (Source a)
For the translation to Haskell, we have chosen to use a lightweight
encoding, assuming linearity of effectful variables; arguing at the
same time for support of linearity in future Haskell versions. Yet,
other encodings could be chosen. For example, we could have used the
technique of Pucella and Tov (Haskell session types with almost no
class), which does not require abiding to linearity.
Future Work
===========
As we see it, a natural next step for the present work is to show that
intermediate sources and sinks can be deforested. As it stands, we
believe that a standard approach
\cite{gill_short_1993,svenningsson_shortcut_2002,coutts_stream_2007}
should work: 1. encode sources (and sinks) as non-recursive data types
2. show that standard evaluation rules remove the intermediate
occurrences of the encoded types. However, this work has not been
carried out yet.
The duality principle exposed here as already been taken advantage of
to support fusible array types \cite{bernardy_composable_2015,ankner_edsl_2013}. The
present paper has shown how to support effectful stream
computations. One would naturally think that the same principle can be
applied to other lazily-evaluated data structures, such as the game
trees discussed by \citet{hughes_functional_1989}: as far as we know
this remains to be investigated.
Another line of development would be to implement language support for
the linearity convention, directly in Haskell. There has been many
proposals to extend functional languages with linear types (see for
example \cite[Ch. 9]{tov_practical_2012} for a survey). These
proposals are often rather involved, because they typically support
advanced forms of polymorphism allowing to abstract over the linearity
of an argument. The linearity convention that we use here calls for no
such complexity, therefore we hope it may be enough of a motivation to
add simple linear type support in research-grade Haskell compilers.
Conclusion
==========
We have cast an new light on the current state of coroutine-based
computation in Haskell, which we have done so by drawing inspiration
from classical linear logic. We have further shown that the concepts
of duality and polarity provide design principles to structure
continuation-based code. In particular, we have shown that mismatches
in polarity correspond to buffers and control structures, depending on
the kind of mismatch.
Using effectful continuations is not a new idea; in fact it was the
standard way of writing effectful programs in Haskell 1.2. Later
versions of Haskell switched to a monadic approach. However, given the
issues outlined in the introduction, and especially the error-prone
character of lazy monadic IO, many libraries have reverted to explicitly
using co-routines.
A possible reason for selecting monads over co-routines is that monads
are rooted in solid theory (categories). However, we hope to have
shown that co-routines are also rooted in solid theory, namely
linear logic. If Haskell had support for linear types, co-routines
could be used safely, without the quirks of lazy IO.
\acks
We gratefully thank Koen Claessen, Atze van der Ploeg and Nicolas
Pouillard for feedback on drafts of this paper. The source code for
this paper is a literate Haskell file, whose latest version is
available at this url:
https://gist.github.com/jyp/fadd6e8a2a0aa98ae94d
The paper is typeset
using pandoc, lhs2TeX and latex.
\bibliographystyle{abbrvnat}
\bibliography{PaperTools/bibtex/jp,js}
\appendix
Table of Functions: implementations
===================================
> zipSrc s1 s2 t3 = shiftSrc s2 $ \s ->
> unshiftSrc (\t -> forkSnk t s1 s) t3
> forkSnk sab ta tb =
> shiftSrc ta $ \ta' ->
> case ta' of
> Nil -> (forward tb) Full <> sab Nil
> Cons a as -> case tb of
> Nil -> as Full <> sab Nil
> Cons b bs -> fwd (cons (a,b) $ zipSrc as bs) sab
> forkSrc sab ta tb
> = shiftSnk (zipSnk ta (flip forward tb)) sab
> zipSnk sa sb Nil = sa Nil <> sb Nil
> zipSnk sa sb (Cons (a,b) tab) = sa $ Cons a $ \sa' ->
> sb $ Cons b $ \sb' ->
> forkSrc tab (flip forward sa') sb'
> scanSrc f !z = flipSnk (scanSnk f z)
> scanSnk _ _ snk Nil = snk Nil
> scanSnk f z snk (Cons a s) = snk $ Cons next $
> scanSrc f next s
> where next = f z a
> foldSrc' f !z s nb = s (Cont (foldSnk' f z nb))
> foldSnk' _ z nb Nil = nb z
> foldSnk' f z nb (Cons a s) = foldSrc' f (f z a) s nb
Return the last element of the source, or the first argument if the
source is empty.
> lastSrc :: a -> Src a -> NN a
> lastSrc x s k = shiftSrc s $ \s' -> case s' of
> Nil -> k x
> Cons x' cs -> lastSrc x' cs k
> dropSrc i = flipSnk (dropSnk i)
> dropSnk 0 s s' = s s'
> dropSnk _ s Nil = s Nil
> dropSnk i s (Cons _ s') = shiftSrc (dropSrc (i-1) s') s
> fromList = foldr cons empty
> enumFromToSrc :: Int -> Int -> Src Int
> enumFromToSrc _ _ Full = mempty
> enumFromToSrc b e (Cont s)
> | b > e = s Nil
> | otherwise = s (Cons b (enumFromToSrc (b+1) e))
> enumFromToSrc' :: Int -> Int -> CoSrc Int
> enumFromToSrc' _ _ Nil = mempty
> enumFromToSrc' from to (Cons x xs) = do
> x from
> let !from' = from+1
> shiftSnk (enumFromToSrc' from' to) xs
> linesSrc = flipSnk unlinesSnk
> unlinesSnk = unlinesSnk' []
> unlinesSnk' :: String -> Snk String -> Snk Char
> unlinesSnk' acc s Nil = s (Cons acc empty)
> unlinesSnk' acc s (Cons '\n' s') = s (Cons (reverse acc)
> (linesSrc s'))
> unlinesSnk' acc s (Cons c s')
> = s' (Cont $ unlinesSnk' (c:acc) s)
> untilSnk _ Nil = mempty
> untilSnk p (Cons a s)
> | p a = s Full
> | True = s (Cont (untilSnk p))
> interleave s1 s2 Full = s1 Full <> s2 Full
> interleave s1 s2 (Cont s) = s1 (Cont (interleaveSnk s s2))
> interleaveSnk snk src Nil = fwd src snk
> interleaveSnk snk src (Cons a s)
> = snk (Cons a (interleave s src))
> tee s1 t1 = flipSnk (collapseSnk t1) s1
> filterSrc p = flipSnk (filterSnk p)
> filterSnk _ snk Nil = snk Nil
> filterSnk p snk (Cons a s)
> | p a = snk (Cons a (filterSrc p s))
> | otherwise = s (Cont (filterSnk p snk))
> unchunk = flipSnk chunkSnk
> chunkSnk s Nil = s Nil
> chunkSnk s (Cons x xs)
> = fwd (fromList x `appendSrc` unchunk xs) s
> toList s k = shiftSrc s (toListSnk k)
> toListSnk :: N [a] -> Snk a
> toListSnk k Nil = k []
> toListSnk k (Cons x xs) = toList xs $ \xs' -> k (x:xs')
Proofs
======
The laws can be proved by induction, for finite streams. The following
reasoning is only fast-and-loose in the infinite case, but morally
correct \citet{danielsson_fast_2006}.
Associativity
-------------
\var{Nil} case.
< ((t1 <> t2) <> t3) Nil
< == -- by def
< (t1 <> t2) Nil <> t3 Nil
< == -- by def
< (t1 Nil <> t2 Nil) <> t3 Nil
< == -- by associativity of effect composition
< t1 Nil <> (t2 Nil <> t3 Nil)
< == -- by def
< t1 Nil <> ((t2 <> t3) Nil)
< == -- by def
< (t1 <> (t2 <> t3)) Nil
\var{Cons} case.
< ((t1 <> t2) <> t3) (Cons a s0)
< == -- by def
< (t1 <> t2) (Cons a (t3 -! s0))
< == -- by def
< t1 (Cons a (t2 -! (t3 -! s0)))
< == -- by IH
< t1 (Cons a ((t2 <> t3) -! s0))
< == -- by def
< (t1 <> (t2 <> t3)) (Cons a s0)
\var{Full} case.
< ((t1 <> t2) -! s) Full
< == -- by def
< s (Cont (t1 <> t2))
< == -- by def
< (t2 -! s) (Cont t1)
< == -- by def
< (t1 -! (t2 -! s)) Full
\var{Cont} case.
< ((t1 <> t2) -! s) (Cont t0)
< == -- by def
< s (Cont (t0 <> (t1 <> t2)))
< == -- by IH
< s (Cont ((t0 <> t1) <> t2))
< == -- by def
< (t2 -! s) (Cont (t0 <> t1))
< == -- by def
< (t1 -! (t2 -! s)) (Cont t0)
Difference laws
---------------
Let us show only the case for sources, the case for sinks being
similar.
The \var{Full} case relies on the monoidal structure of effects:
< ((s1 <> s2) <> s3) Full
< == -- by def
< (s1 <> s2) Full <> s3 Full
< == -- by def
< (s1 Full <> s2 Full) <> s3 Full
< == -- \var{Eff} is a monoid
< s1 Full <> (s2 Full <> s3 Full)
< == -- by def
< s1 Full <> (s2 <> s3) Full
< == -- by def
< (s1 <> (s2 <> s3)) Full
The \var{Cont} case uses mutual induction:
< ((s1 <> s2) <> s3) (Cont k)
< == -- by def
< (s1 <> s2) (Cont (k -? s3)
< == -- by def
< s1 (Cont (k -? s3) -? s2)
< == -- mutual IH
< s1 (Cont (k -? (s2 <> s3)))
< == -- by def
< (s1 <> (s2 <> s3)) (Cont k)
The \var{Cons} case uses mutual induction:
< ((k -? s2) -? s1) (Cons a s0)
< == -- by def
< (k -? s2) (Cons a (s0 <> s1))
< == -- by def
< k (Cons a ((s0 <> s1) <> s2))
< == -- mutual IH
< k (Cons a (s0 <> (s1 <> s2))
< == -- def
< (k -? (s1 <> s2)) (Cons a s0)
(We omit the \var{Nil} case; it is similar to the \var{Full} case)
<!--
-- LocalWords: forkIO readChan writeChan newChan Applicative IORef
-- LocalWords: coroutine Coroutines hughes compositionality inFile
-- LocalWords: effectful kiselyov openFile ReadMode hGetContents NN
-- LocalWords: putStr hClose Girard fileSrc stdoutSnk stdout Src ap
-- LocalWords: compositional mempty mappend Dually involutive Snk
-- LocalWords: unshift monadic versa forkSrc textit iff rw eof pre
-- LocalWords: consumptions onSource onSink unshiftSnk unshiftSrc
-- LocalWords: shiftSnk shiftSrc kk flipSnk flipSrc mapSrc mapSnk
-- LocalWords: snk formers dnintro dndel duals takeSrc takeSnk th
-- LocalWords: monoidal appendSrc appendSnk forwardThenSnk src IH
-- LocalWords: forwardThenSrc infixr infixl equalities morphism ss
-- LocalWords: contravariant concatSrcSrc concatSnkSrc concatAux mx
-- LocalWords: TODO ssrc monads comonads comonad counit contramap
-- LocalWords: sinkToSnk superclasses josef subclasses zipWith Sym
-- LocalWords: zipSrc forkSnk zipSnk scanl scanSrc scanSnk foldl yy
-- LocalWords: foldSrc foldSnk dropSrc dropSnk fromList toList ret
-- LocalWords: linesSrc unlinesSnk untilSnk interleaveSnk filterSrc
-- LocalWords: filterSnk unchunk chunkSnk claessen newtype forall
-- LocalWords: longestResultSnk mres hFileSnk hPutStrLn fileSnk txt
-- LocalWords: FilePath WriteMode hFileSrc hIsEOF hGetLine copyFile
-- LocalWords: stdin hFileSrcSafe IOException Asynchronicity dually
-- LocalWords: demultiplexing dmux tb sab mux sa sb De ka resta kb
-- LocalWords: restb CoSrc CoSnk mapCoSrc mapCoSnk coToList strat
-- LocalWords: coFileSrc coFileSink srcToCoSrc coSnkToSnk sna tmp
-- LocalWords: distributable fileBuffer chanCoSnk chanSrc varCoSnk
-- LocalWords: chanBuffer writeIORef varSrc readIORef varBuffer kay
-- LocalWords: newIORef flipBuffer dualized bufferedDmux laurent el
-- LocalWords: collapseSnk zeilberger jackson Jacksons FeldSpar DSL
-- LocalWords: svenningsson iteratively Iteratees Kiselyov's GetC
-- LocalWords: iteratees citeyear iteratee Enumeratee elo eli GenT
-- LocalWords: enumeratees ReaderT caires Pucella Tov coutts tov js
-- LocalWords: acks url pandoc lhs bibliographystyle abbrvnat bs nb
-- LocalWords: PaperTools lastSrc foldr Kiselov's natively runtime
-- LocalWords: async algorithmically tmpFile gonzalez snoyman kmett
-- LocalWords: Atze der Ploeg enumFromToSrc ScopedTypeVariables acc
-- LocalWords: TypeOperators RankNTypes LiberalTypeSynonyms reify
-- LocalWords: BangPatterns TypeSynonymInstances FlexibleInstances
-- LocalWords: pipelining demultiplexed nnIntro nnElim QSem newQSem
-- LocalWords: ankner edsl axelsson toListSnk waitQSem signalQSem
-->
-- > f :: forall a. Src a -> N [a] -> ()
-- > f x ret = do toList x ret
-- > return True
-- > yy :: Src a
-- > yy = undefined
-- > zz :: N [a] -> IO Bool
-- > zz ret = f yy ret
-- > f :: forall a. Src a -> () -> IO Bool
-- > f x () = do _ <- toList x _
-- > return True
-- > yy :: Src a
-- > yy = undefined
-- > zz :: () -> IO Bool
-- > zz = f yy
-- LocalWords: maccagnoni formulae smallcaps mellis mazurak CIH zz
-- LocalWords: boundedChanBuffer danielsson
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment