Streams in programming is an important concept. It allows you to handle high volumes of data without consuming much memory. A stream is an object that holds internal state (say reader position in a file) and can produce new values when requested.
Here's an example of a reader stream that reads a file in chunks in Swift 4.2:
final class ChunkReader {
let handle: FileHandle
let chunkSize: Int
init?(path: String, chunkSize: Int, start: UInt64 = 0) {
guard let handle = FileHandle(forReadingAtPath: path) else {
return nil
}
self.handle = handle
self.chunkSize = chunkSize
guard start > 0 else { return }
handle.seek(toFileOffset: start)
}
func next() -> Data {
return handle.readData(ofLength: chunkSize)
}
}
Pros of this approach:
- works fine with small number of states and state transitions
Cons:
- requires a separate class declaration
- splits the implementation into multiple functions (
init
andnext
in this case) - lack of support in standard library, the only standard protocol that could fit this shape is
IteratorProtocol
.
For years, Python, C#, JavaScript and other popular languages supported generator syntax, which provides a standard approach to implementing streams.
For example, here's a file reader stream with generators in JavaScript/TypeScript with Node.js fs
module. Note function*
and yield
keywords:
import { openSync, readSync, writeFileSync } from 'fs';
function* chunkReader(path: string, chunkSize: number, start: number = 0): Buffer {
// 'r' - Open file for reading.
// An exception occurs if the file does not exist.
const fileDescriptor = openSync(path, 'r');
let offset = start;
let bytesRead: number;
const buffer = Buffer.alloc(chunkSize);
do {
// returns `buffer` from `.next()` call below
// and suspends until `.next()` is called again
bytesRead = readSync(fileDescriptor, buffer, 0, chunkSize, offset);
yield buffer.toString();
offset += chunkSize;
} while (bytesRead);
}
// generating test data: a range of integers from 0 to 41
const testData = JSON.stringify([...Array(42).keys()]);
fs.writeFileSync('test.json', testData);
// a sequence of chunks from 'test.txt'
const stream = chunkReader('test.json', 5);
// first stream chunk
console.log(stream.next());
// prints `{ value: '[0,1,', done: false }`
// resume the generator and print the rest of the chunks
for (const chunk of stream) {
console.log(chunk);
}
And here's my pitch for generators support in Swift (feedback and bikeshedding in comments welcome!):
func chunkReader(path: String, chunkSize: Int, start: UInt64 = 0) -> Generator<Data> {
return Generator {
guard let handle = FileHandle(forReadingAtPath: path) else {
return
}
handle.seek(toFileOffset: start)
var data: Data
repeat {
data = handle.readData(ofLength: chunkSize)
yield data
} while !data.isEmpty
}
}
let stream = chunkReader(path: "test.json", chunkSize: 5)
for chunk in stream {
// prints lines of every 5 characters in test.json
print(chunk)
}
Pros of this generator syntax:
- Less code than in the class version.
- All code, state and state transitions are consolidated in the single declaration.
- A generator declaration looks just like a function declaration.
- Syntax similar to common functions also allows generator closures that are able to capture variables declared in outer scope.
- State is expressed as local scope variables, not as instance variables.
for ... in ... { ... }
iteration syntax for free, no need to implementIteratorProtocol
.- Returned generator objects are still first-class in the same way as closures and can be passed around as needed.
Cons:
- When having many state transitions it's better to split a huge generator into multiple generators, which is arguably not a bad thing, as generators of standard shape can compose better than ad-hoc stream classes.
In more complex scenarios streams can throw errors, consider an HTTP reader stream with these possible states:
enum HTTPReaderState {
case initialized
case dnsResolved(ipAddress: String)
case connectionEstablised
case requestWritten
case headersParsed(headers: [String: String])
case redirectRequired(redirected: URL)
case bodyChunkDownloaded(chunk: Data)
case connectionClosed
}
In the case of a user who wants to know the current progress of a stream to display in the UI or logging purposes, it would makes sense it implement it as a generator that returns these possible states:
func httpReader(url: URL) -> Generator<HTTPReaderState> {
return Generator {
var redirectURL: URL?
let socket: Socket
repeat {
let ipAddress = try resolveDNS(redirectURL ?? url)
yield .dnsResolved(ipAddress)
socket = try establishConnection(ipAddress)
yield .connectionEstablished
try writeHTTPRequest(socket, url)
yield .requestWritten
let statusCode = try parseHTTPStatusCode(socket)
let headers = try parseHTTPHeaders(socket)
yield .headersParsed(headers)
if isRedirectCode(statusCode) {
redirectURL = headers["Location"]
if let u = redirectURL {
yield .redirectRequired(u)
}
} else {
redirectURL = nil
}
} while redirectURL != nil
var data = try readHTTPBody(socket)
while !data.isEmpty {
yield .bodyChunkDownloaded(data)
data = try readHTTPBody(socket)
}
socket.close()
yield .connectionClosed
}
}
For throwing generators current for ... in
syntax and IteratorProtocol
wouldn't work, because the producing function would look like next() throws -> HTTPReaderState
, which doesn't match the non-throwing next()
member of IteratorProtocol
. That's not a big problem and you would still be able to iterate through a generator manually:
let downloadStream = httpReader(URL(string: "https://httpbin.org/get")!)
while let state = try? downloadStream.next() {
print(state)
}
In this proposal the interface of the Generator
type would look like this:
// this is only the interface, not the implementation
final class Generator<Output>: IteratorProtocol {
// describes possible generator status
enum Status {
case suspended
case failed(Error)
case finished
}
private(set) var status: Status = .suspended
// get output of a generator with these functions
func next() -> Output?
func try next() -> Output?
// cancelling a generator
func stop()
func fail(error: Error)
}
With generators implemented as above, you can only consume a generator value, but can't feed back any input values in a straightforward way. I propose introducing a Coroutine
type inspired by Python's coroutines. JavaScript's generators allow this too, but there is no official terminological distinction between generators and coroutines in JavaScript, although they work almost identically to Python's version.
So, proposed Swift's version could look like this:
// this is only the interface, not the implementation
final class Coroutine<Input, Output> {
// describes possible coroutine status
enum Status {
case suspended
case failed(Error)
case finished
}
private(set) var status: Status = .suspended
// get output of a generator with these functions
func next(_ input: Input) -> Output?
func try next(_ input: Input) -> Output?
// cancelling a coroutine
func stop()
func throw(error: Error)
}
A simple example of using Coroutine
type and yield
expressions in Swift:
// proposed coroutine syntax: first `next` input is explicit
let co = Coroutine<Int, Int> { firstInput in
print("coroutine starts")
print("firstInput is \(firstInput)")
var input = yield 42
print(input)
input = yield 24
print(input)
print("coroutine ends")
}
print(co.next(5))
// coroutine body prints: "coroutine starts", "firstInput is 5"
// this print: `42`
print(co.next(10))
// coroutine body print: `10`
// this print: `24`
print(co.next(15))
// coroutine body prints: `15` and "coroutine ends"
// this print: `nil`
By analogy with the reader stream implemented previously, let's implement a writer stream with a coroutine:
func chunkWriter(path: String,
start: UInt64 = 0) -> Coroutine<Data, ()> {
return Coroutine { firstInput in
guard let handle = FileHandle(forReadingAtPath: path) else {
return
}
handle.seek(toFileOffset: start)
var data = firstInput
while !data.isEmpty {
handle.write(data)
data = yield
}
}
}
In this example we implement a streaming parser that converts a JSON array of integers to Swift's [Int]
. The parser uses nearly constant amount of memory, doesn't need to read the parsed character sequence all at once and if there's a parsing error, it's still able to produce a partial result:
enum ParsingError: String, Error {
case expectedOpenBracket = "expected \"[\""
case expectedDigitOrSpace = "expected a digit or a space"
case unexpectedInput = "unexpected input"
}
let whitespaceChars: [Character] = [" ", "\r", "\n", "\t"]
func arrayParser() -> Coroutine<Character, Int?> {
return Coroutine { firstInput in
// integer digits are accumulated in a buffer before they're parsed
var acc = ""
var input = firstInput
// JSON arrays start with '['
guard input == '[' else {
throw ParsingError.expectedOpenBracket
}
// a value to yield
var value: Int? = nil
// spaces aren't permitted between digits, so we need
// a special flag to handle it
var previousIsSpace = false
// flushes accumulated buffer
func flush() {
// integer digits parsed
value = Int(acc)
acc = ""
}
// iterate over all input characters
repeat {
input = yield value
// resetting next value to yield
value = nil
// special handling for whitespace characters
if !whitespaceChars.contains(input) {
switch input {
case ",":
if !acc.isEmpty {
flush()
} else if !previousIsSpace {
throw ParsingError.expectedDigitOrSpace
}
case "0"..."9" where !previousIsSpace || acc.isEmpty:
acc.append(input)
case "]":
flush()
break
default:
throw ParsingError.unexpectedInput
}
previousIsSpace = false
} else {
previousIsSpace = true
}
} while input != nil
yield value
}
}
let parser = arrayParser()
for char in '[1, 2,3,4,5 , 42]') {
if let result = parser.next(char) {
print(result)
}
}
// prints lines of 1 2 3 4 5 42
Suppose that you have a Future
type in your toolbelt, this one is inspired by BrightFutures
:
// this is only the interface, not the implementation
final class Future<Result> {
func onSuccess(_ handler: (Result) -> ())
func onFailure(_ handler: (Error) -> ())
}
Turns out, async
functions that you've seen in Python and JavaScript can be built in Swift in a relatively straightforward way. Every async
function is a coroutine that yield
s a future object and consumes the result of that future.
It's easy to illustrate that with a few task futures that produce results of the same type:
enum TaskResult {
case init
case result1
case result2
case result3
}
func task1(input: TaskResult) -> Future<TaskResult>
func task2(inputFromTask1: TaskResult) -> Future<TaskResult>
func task3(inputFromTask2: TaskResult) -> Future<TaskResult>
A future-based interaction with these tasks is a typical chain of closures:
let task3Result = task1(input: .init)
.flatMap {
task2(inputFromTask1: $0)
}
.flatMap {
task3(inputFromTask2: $0)
}
.onSuccess {
print($0)
}
.onFailure {
print($0)
}
It requires you to chain futures with flatMap
and error handling is quite different from the usual try/throws
workflow. Coroutines allow us to make asynchronous code to look much more similar to usual synchronous code:
let co = Coroutine<TaskResult, Future<TaskResult>> { task1Input in
let task1Result = yield task1(input: task1Input)
let task2Result = yield task2(inputFromTask1: task1Result)
yield task3(inputFromTask2: task2Result)
}
This coroutine co
needs a coroutine runner to execute, which is relatively simple and short:
struct CoroutineAlreadyFinished: Error {
}
func run<Result>(input: Result, coroutine: Coroutine<Result, Future<Result>>) -> Future<Result> {
do {
guard let future = try coroutine.next(input) else {
throw CoroutineAlreadyFinished()
}
if coroutine.status != .finished {
return future.flatMap { run(input: $0, coroutine: coroutine) }
} else {
return future
}
} catch {
return Future<Result>(error)
}
}
With this run
helper all coroutines with the same shape can be converted back to futures when needed (usually at the top level):
run(input: .init, coroutine: co)
.onSuccess {
print($0)
// prints `.result3` if all 3 tasks completed
}
.onFailure {
print($0)
// prints an error if any of the tasks failed at any stage
}
The main drawback is that you need tasks to produce the same Result
type. That can be alleviated by changing run
to operate on Coroutine<Any, Future<Any>>
type, but this would cause conditional type casts at every yield
expression. But the return type of every task is known in compile time, so a compiler transformation would be introduced that roughly produces an equivalent of what we've written above from this:
func taskChaining() async -> TaskResult {
let task1Result = await task1(input: task1Input)
let task2Result = await task2(inputFromTask1: task1Result)
return task3(inputFromTask2: task2Result)
}
Interested to know the details? Check out more examples and diagrams here and here. I look forward to getting feedback from you. Any suggestions, comments and questions are appreciated!