Skip to content

Instantly share code, notes, and snippets.

@achingbrain
Last active May 2, 2024 08:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save achingbrain/7e65ca748326d3b87b81508020a3321d to your computer and use it in GitHub Desktop.
Save achingbrain/7e65ca748326d3b87b81508020a3321d to your computer and use it in GitHub Desktop.
Streaming benchmark
import { Readable, Writable } from 'node:stream'
import { pipe } from 'it-pipe'
const ITERATIONS = 1000
function createData () {
return new Array(1024).fill(0).map(() => new Uint8Array(1024))
}
async function nodeStreams () {
const data = createData()
const output = []
await new Promise((resolve, reject) => {
const readable = Readable.from(data)
const writable = new Writable({
write: (chunk, enc, cb) => {
output.push(chunk)
cb()
}
})
writable.on('finish', () => {
if (output.length !== data.length) {
reject(new Error('Short read'))
return
}
resolve()
})
readable.pipe(writable)
})
}
async function nodeStreamsAsWebStreams () {
const data = createData()
const output = []
const readable = Readable.from(data)
const writable = new Writable({
write: (chunk, enc, cb) => {
output.push(chunk)
cb()
}
})
const readableWeb = Readable.toWeb(readable)
const writableWeb = Writable.toWeb(writable)
await readableWeb.pipeTo(writableWeb)
if (output.length !== data.length) {
throw new Error('Short read')
}
}
async function webStreams (readableOpts = {}) {
const data = createData()
const output = []
let index = 0
const readable = new ReadableStream({
pull: (controller) => {
if (index === data.length) {
controller.close()
return
}
controller.enqueue(data[index])
index++
},
...readableOpts
})
const writable = new WritableStream({
write: (chunk) => {
output.push(chunk)
}
})
await readable.pipeTo(writable)
if (output.length !== data.length) {
throw new Error('Short read')
}
}
async function webByteStreams () {
await webStreams({ type: 'bytes' })
}
async function itDuplex () {
const data = createData()
const output = []
const it = {
source: async function * () {
yield * data
}(),
sink: async (source) => {
for await (const buf of source) {
output.push(buf)
}
}
}
await pipe(it, it)
if (output.length !== data.length) {
throw new Error('Short read')
}
}
async function nodeStreamsAsItDuplex () {
const data = createData()
const output = []
const readable = Readable.from(data)
const writable = new Writable({
write: (chunk, enc, cb) => {
output.push(chunk)
cb()
}
})
const it = {
source: readable,
sink: async (source) => {
for await (const buf of source) {
const writeMore = writable.write(buf)
if (!writeMore) {
await new Promise((resolve) => {
writable.once('drain', () => {
resolve()
})
})
}
}
}
}
await pipe(it, it)
if (output.length !== data.length) {
throw new Error('Short read')
}
}
async function webStreamsAsItDuplex (readableOpts = {}) {
const data = createData()
const output = []
let index = 0
const readable = new ReadableStream({
pull: (controller) => {
if (index === data.length) {
controller.close()
return
}
controller.enqueue(data[index])
index++
},
...readableOpts
})
const writable = new WritableStream({
write: (chunk) => {
output.push(chunk)
}
})
const it = {
source: async function * () {
const reader = readable.getReader()
let next = await reader.read()
while (next.done !== true) {
yield next.value
next = await reader.read()
}
}(),
sink: async (source) => {
const writer = writable.getWriter()
for await (const buf of source) {
await writer.ready
await writer.write(buf)
}
await writer.close()
}
}
await pipe(it, it)
if (output.length !== data.length) {
throw new Error('Short read')
}
}
async function webByteStreamsAsItDuplex () {
await webStreamsAsItDuplex({ type: 'bytes'})
}
const tests = {
'node streams': nodeStreams,
'node streams as web streams': nodeStreamsAsWebStreams,
'web streams': webStreams,
'web byte streams': webByteStreams,
'duplex async iterators': itDuplex,
'node streams as duplex async iterator': nodeStreamsAsItDuplex,
'web streams as duplex async iterator': webStreamsAsItDuplex,
'web byte streams as duplex async iterator': webByteStreamsAsItDuplex
}
// warmup
for (const [name, test] of Object.entries(tests)) {
for (let i = 0; i < ITERATIONS; i++) {
await test()
}
}
const results = []
// test
for (const [name, test] of Object.entries(tests)) {
const start = Date.now()
for (let i = 0; i < ITERATIONS; i++) {
await test()
}
const time = Date.now() - start
results.push({
'Name': name,
'Time': time,
'Ops/s': time/ITERATIONS
})
}
// print results
console.table(results.sort((a, b) => {
if (a.Time < b.Time) {
return -1
}
if (a.Time > b.Time) {
return 1
}
return 0
}))
@achingbrain
Copy link
Author

achingbrain commented Jun 23, 2023

Results on node v18.16.0:

% node index.js
┌─────────┬─────────────────────────────────────────────┬──────┬───────┐
│ (index) │                    Name                     │ Time │ Ops/s │
├─────────┼─────────────────────────────────────────────┼──────┼───────┤
│    0    │   'node streams as duplex async iterator'   │ 1051 │ 1.051 │
│    1    │               'node streams'                │ 1104 │ 1.104 │
│    2    │          'duplex async iterators'           │ 1131 │ 1.131 │
│    3    │                'web streams'                │ 2208 │ 2.208 │
│    4    │   'web streams as duplex async iterator'    │ 2317 │ 2.317 │
│    5    │             'web byte streams'              │ 2623 │ 2.623 │
│    6    │ 'web byte streams as duplex async iterator' │ 2827 │ 2.827 │
│    7    │        'node streams as web streams'        │ 3172 │ 3.172 │
└─────────┴─────────────────────────────────────────────┴──────┴───────┘

@achingbrain
Copy link
Author

achingbrain commented Jun 26, 2023

Results on node v20.2.0 shows a bigger performance gap:

% node index.js
┌─────────┬─────────────────────────────────────────────┬──────┬───────┐
│ (index) │                    Name                     │ Time │ Ops/s │
├─────────┼─────────────────────────────────────────────┼──────┼───────┤
│    0    │               'node streams'                │ 500  │  0.5  │
│    1    │          'duplex async iterators'           │ 660  │ 0.66  │
│    2    │   'node streams as duplex async iterator'   │ 744  │ 0.744 │
│    3    │                'web streams'                │ 1713 │ 1.713 │
│    4    │   'web streams as duplex async iterator'    │ 1797 │ 1.797 │
│    5    │             'web byte streams'              │ 1988 │ 1.988 │
│    6    │ 'web byte streams as duplex async iterator' │ 2061 │ 2.061 │
│    7    │        'node streams as web streams'        │ 2151 │ 2.151 │
└─────────┴─────────────────────────────────────────────┴──────┴───────┘

@achingbrain
Copy link
Author

This lives here now for better visibility: https://github.com/ipfs-shipyard/js-streams-benchmark

@achingbrain
Copy link
Author

Results on node v20.12.0, web streams are catching up:

% node index.js
┌─────────┬─────────────────────────────────────────────┬──────┬───────┐
│ (index) │ Name                                        │ Time │ Ops/s │
├─────────┼─────────────────────────────────────────────┼──────┼───────┤
│ 0       │ 'node streams'                              │ 404  │ 0.404 │
│ 1       │ 'duplex async iterators'                    │ 507  │ 0.507 │
│ 2       │ 'node streams as duplex async iterator'     │ 745  │ 0.745 │
│ 3       │ 'web streams'                               │ 839  │ 0.839 │
│ 4       │ 'web streams as duplex async iterator'      │ 1214 │ 1.214 │
│ 5       │ 'web byte streams'                          │ 1313 │ 1.313 │
│ 6       │ 'node streams as web streams'               │ 1394 │ 1.394 │
│ 7       │ 'web byte streams as duplex async iterator' │ 1633 │ 1.633 │
└─────────┴─────────────────────────────────────────────┴──────┴───────┘

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment