Skip to content

Instantly share code, notes, and snippets.

@joepie91
Created August 19, 2024 17:11
Show Gist options
  • Save joepie91/f4f9a56d37a5935833c242defd4f732c to your computer and use it in GitHub Desktop.
Save joepie91/f4f9a56d37a5935833c242defd4f732c to your computer and use it in GitHub Desktop.

Promistreams 'getting started'

Written on: 2024/08/19. Note that this was written while Promistreams are still being polished, so if it's been a while since this date, it's probably outdated!

This is a brief explanation of how Promistreams work, and how to use them, in their current early-ish state.

The core model is pretty well-defined by this point, and major changes in internal structure are not expected to happen. Compatibility is unlikely to be broken in major ways up to the 1.0.0 release, but occasionally you may need to update a few of the libraries at once to keep things working together perfectly. This will typically only involve a version bump, no code changes.

What to expect

Promistreams are pull-based streams. This means that a Promistream does nothing until a value is requested from it, directly or indirectly. This mirrors how streams work in many other languages, and is unlike Node.js streams.

Another thing that is unlike Node.js streams, is that a Promistream is always piped into exactly one other stream at a time; though which stream this is, may change over time.

Additionally, Promistreams provide the following features:

  • Promise-oriented; 'reading'/running a pipeline returns a Promise, and all internal callbacks work with Promises (including async/await) out of the box. No manual callback wiring!
  • Well-defined and consistent error handling.
  • Well-defined and consistent cancellation/abort behaviour, including automatically on error conditions.
  • Safe concurrent use, ie. having multiple "in-flight" values in the pipeline at once.
  • Full interoperability with arbitrary other stream/sequence-shaped types; currently iterables, async iterables and Node.js streams are implemented, but others will follow (please let me know if you need any particular ones!).
  • Branching and converging/merging streams, supporting arbitrary distribution patterns, to accommodate cases where one stream needs to be piped into multiple other streams or vice versa.
  • Each stream has precise control over when it reads from upstream, and provides results to downstream.
  • Composability of streams.

What not to expect

There are a few things that Promistreams do not prioritize in their design.

Maximum performance. While of course stream implementations will be optimized as much as possible, and a simple core model helps to ensure that, Promistreams don't aim for performance as the #1 goal. This is because doing so would require serious tradeoffs on usability and reliability. In practice, the performance is still quite good, and more than enough for the majority of real-world usecases. Likewise, a lot of work has gone into not making the internals more complex than necessary. But if you are trying to minimize every byte of memory and clock cycle, Promistreams are probably not the right choice for you.

API familiarity. While Promistreams take inspiration from a number of other stream implementations - most notable pull-streams and futures.rs streams - they do not aim to mirror any one particular streams API. Instead, the API is optimized for usability of the specific model that Promistreams implement, and specifically their use in Javascript.

Aesthetics. While the API is designed to be predictable and easy to reason about, and to roughly represent a pipeline, it does not necessarily look aesthetically nice, and some patterns - particularly branching and diverging - may look at bit strange or ugly. The choice was made to optimize for predictability and ergonomics over aesthetic quality, where these goals conflict.

Seekable streams. Like most streams implementations, Promistreams are read-to-end streams, and do not support seeking within streams (although they do support infinite streams, including queues!). If you need seeking capabilities, I would recommend to create a custom abstraction that eg. lets you specify a starting offset and then dispenses a Promistream that starts reading from that offset, and uses happy aborts to stop reading. This way, you get the ergonomics of a Promistream, but can still read specific segments of a resource.

Non-object mode. Node.js streams have two modes; 'regular' mode (Buffers/strings only) and 'object' mode (everything else). Promistreams only have object mode. You can still work with Buffers and strings as before, the streams design is just not specially aware of them, and how they are handled is entirely decided by the specific streams you use.

So this is a library?

Not exactly. I am building this as an interoperable specification, and it's designed so that no libraries are required to make use of them; the internal structure of a Promistream is very simple, and contains the absolute minimum complexity to have reliably composable streams.

That having been said, libraries are provided at every level of abstraction, which implement this specification - including high-level streams for specific usecases but also low-level utilities. This means that you can use it like a library if you want to, but you can also use it as a spec. These libraries are highly modular; you only install the parts you actually need. Other future implementations of the specification may make different distribution choices.

Currently, the spec is not complete, and is still subject to change. In practice that means that only use-as-a-library is currently viable for real-world projects. The spec is partly written and is pending more real-world experimentation with the current implementation, to find the rough edges and polish them before publishing the specification and 'locking in' the design.

Likewise, many of the libraries are currently missing proper documentation. However, almost every Promistream library that currently exists includes an example.js that demonstrates how to use that particular library or stream in your code. Combined with the introduction in this post, that should get you quite far! And if you're stuck, don't hesitate to ask - those questions also help to build out the documentation better.

Known issues

Currently there is a single known issue: a design change was fairly recently made, in the process of formalizing the spec, where it was decided that a 'happy abort' (ie. a cancellation under expected circumstances, like the successful completion of a process that did not need to consume all upstream data) should be exposed by the pipeline as an EndOfStream (as if the source stream ran out of data entirely) instead of an Aborted.

You may run into some older implementations of source streams that still implement the old behaviour, as this change has not been applied everywhere yet. Please report it when this happens, and they will be fixed promptly!

Okay, so how do I use it?

Here's a simple example of a valid Promistream pipeline:

"use strict";

const pipe = require("@promistream/pipe");
const fromIterable = require("@promistream/from-iterable");
const map = require("@promistream/map");
const collect = require("@promistream/collect");

(async function() { 
	let numbers = await pipe([
		fromIterable([ 1, 2, 3, 4 ]),
		map((number) => number * 2),
		collect()
	]).read();

	console.log(numbers); // [ 2, 4, 6, 8 ]
})();

That's it! That's all there is to it. The call to pipe returns a Promise, and you can await it like any other Promise - if something goes wrong anywhere inside the pipeline, it automatically aborts the stream, running any teardown logic for each stream in the process, and then throws the original error that caused the failure. Otherwise, you get back whatever output the collect stream produced - which is simply an array of every value it has read from upstream.

In this example, the fromIterable is what is known as the source stream - it provides the original data - and the collect stream is what's known as the sink stream, which is responsible for reading out the entire pipeline until it is satisfied, which usually means "the source stream has run out of data" (but it can choose to behave differently!). The streams inbetween, just map in this case, are transform streams.

Look carefully at the pipe invocation, and how read is called on it. This is necessary to 'kickstart' the pipeline. The only thing that pipe does is to compose a series of Promistreams into one combined stream, automatically wiring up both ends, and you still need to call read on the result to cause the last stream in that pipeline to start reading stuff. Doing so is basically equivalent to calling read on the collect stream directly, with the streams before it as an argument, the pipe function just wires this up for you.

Now this example is not very interesting, because everything is synchronous. But it still works the exact same if we do something asynchronous:

"use strict";

const pipe = require("@promistream/pipe");
const fromIterable = require("@promistream/from-iterable");
const map = require("@promistream/map");
const collect = require("@promistream/collect");

(async function() { 
	let numbers = await pipe([
		fromIterable([ 1, 2, 3, 4 ]),
		map(async (number) => await doubleNumberRemotely(number)), // I have no idea why you would want to do this
		collect()
	]).read();

	console.log(numbers); // [ 2, 4, 6, 8 ]
})();

We've replaced the map callback with one that is asynchronous, and it still works the exact same way! Of course in a real-world project it would be absurd to use a remote service for doubling numbers, but this keeps the example simple to follow. The asynchronous logic could be anything - as long as it returns a Promise.

What if we want to reuse this logic in multiple pipelines? We could just have a function that generates a custom map stream on-demand. It would need to be a function that creates one, because each pipeline would still need its own map instance - streams are single-use! It might look like this:

"use strict";

const pipe = require("@promistream/pipe");
const fromIterable = require("@promistream/from-iterable");
const map = require("@promistream/map");
const collect = require("@promistream/collect");

function double() {
	// We're using the synchronous version here again, because doubling numbers remotely was a terrible idea!
	return map((number) => number * 2);
}

(async function() { 
	let numbers = await pipe([
		fromIterable([ 1, 2, 3, 4 ]),
		double()
		collect()
	]).read();

	console.log(numbers); // [ 2, 4, 6, 8 ]
})();

As you can see, it's all just functions returning Promises (or values, or throwing errors - the rules are the same as within any async context).

But what if we want to make a more complicated custom stream? Let's filter out all the numbers that are a multiple of ten!

"use strict";

const pipe = require("@promistream/pipe");
const fromIterable = require("@promistream/from-iterable");
const map = require("@promistream/map");
const filter = require("@promistream/filter");
const collect = require("@promistream/collect");

function doubleAndFilter() {
	return pipe([
		map((number) => number * 2),
		filter((number) => number % 10 > 0)
	]);
}

(async function() { 
	let numbers = await pipe([
		fromIterable([ 1, 2, 3, 4 ]),
		doubleAndFilter()
		collect()
	]).read();

	console.log(numbers); // [ 2, 4, 6, 8 ]
})();

Wait, what's pipe doing there? Well, think back to this earlier comment:

The only thing that pipe does is to compose a series of Promistreams into one combined stream, automatically wiring up both ends, and you still need to call read on the result to cause the last stream in that pipeline to start reading stuff.

We're now getting to the reason why this is the case, and why the composing and reading of pipelines are split up! Because the composing doesn't cause any reads, and it doesn't require a complete pipeline (with a data source and a sink), you can also use it to compose together multiple streams into a single reusable stream, that can then be inserted into another pipeline!

Since we're now both mapping and filtering, this is exactly what we need - some way to represent those two streams, in sequence, with pre-specificed callbacks, exposed externally as a single custom stream - and so we use pipe to accomplish that. The resulting pipeline works just as if you were to manually insert both of those streams after one another, with the same behaviours and error handling. This is the key to what makes Promistreams composable.

This pretty much covers the basic use of Promistreams. There are many different types of streams, including branching streams, that change the exact behaviour of the pipeline; but the basic operation always looks like this. Pipe together array of streams, call read on the resulting pipeline once it's finalized, or return the un-read pipeline if it's meant to be used as a composite stream. Even the more complex pipelines still work like this.

The behaviours and responsibilities of different types of streams

While you don't need to know much about the internals of Promistreams to use the libraries, there are a few things that are useful to know, mostly around which streams are responsible for what. In the Promistreams design, much of the behaviour is 'emergent'; it's not enforced by some central runtime or orchestrator, but rather is the emergent result of different parts of the system behaving in certain defined ways.

For example, you might think that the pipe function does error handling, but it doesn't! All of the error handling is emergent from the design, and simply a result of how Promises work - a pipeline is essentially just a very long chain of nested Promise callbacks, internally. All that pipe does is a bit of bind magic to pass the previous stream into the next one.

However, some things do need to be defined to make things like error handling and concurrency work correctly. The decision was made to shift this burden to the source and sink streams, as these are the least likely to require a custom implementation - the result is that a transform stream is not much more than an async function, and does not need to care about error handling at all if it doesn't want to act on those errors. Errors will simply propagate through them with the usual throw/rejection mechanisms of Promises.

The source and sink streams need to do a bit more; they are responsible for emitting 'markers' and handling rejections, respectively. The 'markers' are EndOfStream and Aborted, and these are rejected and propagated like an error would be, but they are specially recognized by (some of the) streams inbetween, as well as the sink stream. They're used for teardown code and, in the case of the sink stream, to generate the appropriate 'consumer-facing' error to throw from the pipeline as a whole.

The basic read process looks like this: you call the read function on the pipeline, which calls the read function on the last stream in it, the sink stream. The sink stream is responsible for 'driving' the pipeline in some way, though exactly what that looks like will depend on the stream implementation. It is valid for a read on the pipeline to only trigger a single upstream read, but that is generally not useful - more typically, the sink stream will start a read loop. The stream upstream from it will call read on its upstream, and so on, recursively, until a value is read from the source stream. Any stream inbetween may modify the result, discard values, combine them, read more times, read less times, and so on. Once the source stream runs out of values, it will start dispensing EndOfStream markers, which will propagate down like an error, and ultimately signal to the sink stream that it should stop any read loops.

The basic abort process looks like this: abort is called on any stream in the pipeline, that stream calls abort on its upstream, which does the same recursively, until it ends up at the source stream. The source stream internally 'latches' into 'aborted' mode, and starts dispensing Aborted markers on subsequent reads, which are thrown/rejected and therefore propagate back downstream, until they eventually end up at the sink stream, which unpacks the original error stored within the Aborted marker and throws it from its read call (and therefore the pipeline's read call). Subsequent attempts at reading the sink stream will throw the Aborted marker itself, so the original error is not duplicated.

(The details are more complicated, and if the abort is a happy abort, rather than one based on an Error, the same latching occurs but with EndOfStream instead of Aborted. Further details will be in the spec.)

When any stream in the pipeline throws an error or rejects a Promise in its read callback, this propagates downstream like any error would, until it is received by the sink stream. It then initiates the abort process described above.

Note that because Aborted and EndOfStream markers are thrown/rejected, transform streams inbetween the source and the sink do not need to care about them, unless they intend to implement some kind of teardown logic, in which case they can be intercepted and then re-thrown. But normally they propagate like any rejection would in a chain of Promises, because that is essentially what they are!

Things not covered here

Here are some of the things not covered in this introduction:

  • Interoperating with Node streams (see example.js in @promistream/from-node-stream - this is pretty trivial and it's even entirely valid to only use Node streams in your pipeline, using this wrapper)
  • Concurrency (this is illustrated in the example.js for the @promistream/parallelize package)
  • Branching (this is illustrates in the example.js of various @promistream/fork-* packages)
  • Converging/merging (currently only illustrated in the example.js of the @promistream/merge-concatenate package - merge streams are still being worked on)
  • The exact details of what source/sink streams are responsible for (part of the unfinished spec)
  • How the internal peek API works (this is responsible for making concurrency work reliably)

Most useful packages/libraries

All of the existing Promistream packages can be found in the package list, but they're not very well-explained. The majority of these should be functional and have an example.js demonstrating their use.

Here's a selection of the packages you are most likely to need:

Common cases

  • @promistream/pipe: the core package that pipes streams together into a pipeline. Technically optional but strongly recommended to use.
  • @promistream/debug: transform stream that simply prints everything that goes through it, optionally with a label. Only for pipeline debugging use.
  • @promistream/simple-source: low-level source stream abstraction. Implements the specification responsibilities, leaving you to worry only about how to produce values. Suitable for the majority of usecases.
  • @promistream/simple-sink: low-level sink stream abstraction. Same as above, but on the other end.
  • @promistream/map: like the array method, but as a Promistream. Also functions as a general-purpose low-level transform stream.
  • @promistream/filter: like the array method, but as a Promistream.
  • @promistream/collect: high-level sink stream, that simply read-loops and collects all read values into an array, then resolves with that array. Often what you want. Also a good example of @promistream/simple-sink use, internally.
  • @promistream/from-node-stream: source/sink/transform wrappers for all types of Node.js streams, to integrate them with a Promistream pipeline.
  • @promistream/from-iterable: creates source stream from a synchronous or asynchronous iterable (including arrays).
  • @promistream/range-numbers: high-level source stream, generates numbers in a specified range.

Complex cases

  • @promistream/buffer: reads an array (of 0 or more items) from upstream, and then dispenses the values in that array (if any) one by one on subsequent reads by its downstream. Often composed with others.
  • @promistream/dynamic: lets you pass a value through different streams/pipelines depending on the value. Finicky and high-overhead; usually fork-and-merge is a better option.
  • @promistream/parallelize: lets you run N reads (up to and including Infinity) simultaneously.
  • @promistream/sequentialize: forces inbound reads from downstream to occur sequentially, 'protecting' its upstream. Mandatory to use if your stream does not support parallel operation and you intend to publish it, or you use parallelize elsewhere in your pipeline.
  • @promistream/rate-limit: as the name implies, throttles reads going through it but leaves results otherwise unmodified.
  • @promistream/simple-queue: high-level source stream that functions as a task queue; items can be added externally.

Specific usecases

  • @promistream/read-file: as the name implies. Produces buffers.
  • @promistream/decode-string: as the name implies. Takes buffers, produces strings.
  • @promistream/split-lines: as the name implies. Takes strings.
  • @promistream/parse-xml: streaming XML parser.

There are other Promistream packages, and there will be many more! These are just some of the ones currently available, that you're likely to need at some point.

Troubleshooting

My process just exits and/or my pipeline doesn't run!

You most likely forgot to call .read() on the pipeline. This is easy to forget. I still do it regularly!

I get a weird error!

All @promistream libraries are meant to produce clear and understandable errors. If they do not, that is a bug. Please report it!

(Currently this is most likely to happen because of a library not being updated for a newer revision of the specification; I'll help figure out what's going on if you report it.)

How do I...?

Is there an off-the-shelf package for it? Give that a shot first. If there isn't, or it doesn't work as you expect, please let me know and I'll help you figure it out!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment