The first concept we want to model in conduit
is that of a Sink
. The motivation is simple: we should be able to send it a stream of values, followed by some terminating signal to indicate that the stream is closed, and it will return some kind of result. In some cases, we don't actually care about that result. For example, with sinkFile
, the result will be ()
.
The mirror concept to a Sink
is a Source
. It wants to produce a stream of values, and send a terminating signal when no more data is available. As described, it's pretty simple to connect these two to each other. Let's get into some of the details.
Some Sink
s won't consume all their input. For example, we might have a Sink
that consumes the first 100 bytes from a stream, writes them to a file, and then finishes. In such a case, we don't want the Source
to read any more data from the file after the Sink
is complete. A Sink
indicates that it's done processing by sending a return value- very similar to how a Source
will send a termination signal. (Note the parallel there, we'll come back to it.)
So when a Sink
completes before the Source
sends a termination signal, the Source
needs to finalize any resource usages immediately. Also, there's no reason anymore for it to send a termination signal to the Sink
, since the Sink
isn't listening.
There's one other basic type that rounds out conduit
: the Conduit
type. This is a bit of a hybrid between a Source
and a Sink
. It can receive a stream of data from a Source
- just like a Sink
can- and it can send a stream of data to a Sink
. When it's finished sending its stream, it sends a termination signal to the Sink
. And if the Sink
finishes before that, the Conduit
needs to shut itself down immediately. (Also, the Source
should shut down immediately as well.) Again, like the Source
, there's no need to send a termination signal if the Sink
finishes first.
But we also have a similar set of interactions between the Source
and Conduit
as well. If the Source
finishes first, it will send a termination signal to the Conduit
, and then the Conduit
will receive no more input. But if the Conduit
finishes first, then the Source
needs to shut down immediately.
The previous discussion had a lot of overlap in it. There seems to be a lot of common ground amongst Source
, Conduit
, and Sink
. Let's see if we can sum up the behaviors they all share, and unify them all into a single type, called Pipe
:
Conduit
s andSinks
can both receive data from an incoming stream. This stream is ultimately finished with a terminating signal. ASource
doesn't really receive such a stream, but we can fake such a stream as an infinite stream of dummy values (e.g.,()
), or by immediately sending a terminating signal.Source
s andConduit
s can both produce a stream of values, followed by a terminating signal. ASink
can't produce such a stream, but again we can fake it with a stream ofVoid
s (see the void package). Since it's impossible to produce values of typeVoid
, we know that theSink
won't ever create output values.- A
Sink
creates a result value. Let's come back to this. Source
s andConduit
s connect to some kind of receiver (aConduit
orSink
). Let's call the receiver downstream, and the sender upstream. If upstream sends a termination signal, then downstream will receive no more inputs. (It can continue processing if it wants to.) On the flip side, if downstream finishes, then upstream needs to finalize resources and finish immediately.
Let's come back to those termination signals and result values. Normally, I would think of a termination signal as containing no data (and in fact, historically, that's exactly how conduit
behaved). But these termination signals are very similar to result types: they indicates when something is done, and when upstream should shut down. So let's go ahead and call them the same thing! Now Source
s and Conduit
s can also produce result values, and when they produce them, it's a signal to downstream that there's no more input coming.
This actually gives a great parallelism. We can think of each component as receiving a stream of data until it receives the result from upstream, which seals that incoming stream. It also produces such a stream of values, sealing the stream with its own result value. The result value returned from the most downstream component will be the result value of the entire pipeline.
Let's take this discussion from the abstract to the concrete. We're going to have a type with five parameters. One will be the underlying Monad
. Then we have two streams (input and output), and each one will have a type for the stream and for the termination signal. So that gives us five parameters:
data Pipe i o u m r
= HaveOutput (Pipe i o u m r) (m ()) o
| NeedInput (i -> Pipe i o u m r) (u -> Pipe i o u m r)
| PipeM (m (Pipe i o u m r))
| Leftover (Pipe i o u m r) i