Skip to content

Instantly share code, notes, and snippets.

@bellbind
Last active December 11, 2022 14:46
Show Gist options
  • Save bellbind/050f69714a968695f895d4996217dc7d to your computer and use it in GitHub Desktop.
Save bellbind/050f69714a968695f895d4996217dc7d to your computer and use it in GitHub Desktop.
[js-ipfs/libp2p] dial to read events from infinite sending from handler, then close read
// wrapping libp2p stream (mplex/stream)
// - stream.source: AsyncIterable<Uint8Array>
// - stream.sink: (Iterable<Uint8Array> | AsyncIterable<Uint8Array>) => Promise<undefined>
// - stream.close, stream.closeRead, stream.closeWrite, stream.abort, stream.reset
const newQueue = () => {
const [gets, polls] = [[], []];
const next = () => new Promise(
get => polls.length > 0 ? polls.shift()(get) : gets.push(get));
const poll = () => new Promise(
poll => gets.length > 0 ? poll(gets.shift()) : polls.push(poll));
const push = value => poll().then(get => get({value, done: false}));
const close = () => poll().then(get => get({done: true}));
return {[Symbol.asyncIterator]() {return this;}, next, push, close};
}
const payload = (u8a, type = 0) => {
const ret = new Uint8Array(u8a.length + 1);
ret[0] = type;
ret.set(u8a, 1);
return ret;
}
export const newClosableStream = stream => {
const eventTarget = new EventTarget();
let sinkFinished = false, sourceFinished = false;
// send to remote
const writeQueue = newQueue();
const writing = async () => {
return stream.sink((async function* () {
let closed = false, finished = false;
while (!closed || !finished) {
const {done, value: {type, value}} = await writeQueue.next();
if (type === "data") {
yield payload(value, 0);
} else if (type === "close") {
yield Uint8Array.from([1]);
closed = true;
} else if (type === "finished") {
yield Uint8Array.from([2]);
finished = true;
}
}
stream.closeWrite();
//console.info("[stream.closeWrite()]");
})());
};
const writingPromise = writing().catch(error => {
eventTarget.dispatchEvent(new CustomEvent("error", {detail: error}));
});
// receive from remote
const readQueue = newQueue();
let remoteClosed = false;
const reading = async () => {
for await (const bl of stream.source) {
if (sourceFinished) break;
const u8a = bl.slice();
//console.log("type", u8a[0], u8a);
if (u8a[0] === 0) readQueue.push({type: "data", value: u8a.slice(1)});
if (u8a[0] === 1) remoteClosed = true;
if (u8a[0] === 2) readQueue.push({type: "finished"});
}
readQueue.push({type: "finished"});
stream.closeRead();
//console.info("[stream.closeRead()]");
};
const readingPromise = reading().catch(error => {
// (ipfs-0.65.0) may spawn `Error: Socket read timeout`
eventTarget.dispatchEvent(new CustomEvent("error", {detail: error}));
});
// wrapped stream.source
const source = (async function* () {
for (;;) {
const {done, value: {type, value}} = await readQueue.next();
if (type === "data") yield value;
if (type === "finished") break;
}
writeQueue.push({type: "close"});
sourceFinished = true;
})();
// wrapped stream.sink
const sink = async iter => {
for await (const value of iter) {
if (remoteClosed) break;
writeQueue.push({type: "data", value});
}
writeQueue.push({type: "finished"});
sinkFinished = true;
};
// send close to read;
const closeRead = async () => {
writeQueue.push({type: "close"});
sourceFinished = true;
};
const closeWrite = async () => {
writeQueue.push({type: "finished"});
sinkFinished = true;
};
// wrapped stream
return Object.assign(eventTarget, {
source, sink, closeRead, closeWrite,
close() {return Promise.all([closeRead(), closeWrite()]);},
reset() {return stream.reset();},
abort(...args) {return stream.abort(...args);},
});
};
import * as fs from "node:fs";
import * as IPFS from "ipfs-core";
import {newClosableStream} from "./closable-stream.js";
// setup two IPFS nodes
const repo1 = "./test-repo1", repo2 = "./test-repo2";
fs.rmSync(repo1, {recursive: true, force: true});
fs.rmSync(repo2, {recursive: true, force: true});
const config = {
Addresses: {
Swarm: [
"/ip4/0.0.0.0/tcp/0",
],
},
};
const node1 = await IPFS.create({
repo: repo1,
config,
});
const id1 = await node1.id();
console.info("[node1 id]", id1.id.toJSON());
console.info("[node1 address]", id1.addresses[0].toJSON());
const node2 = await IPFS.create({
repo: repo2,
config,
});
const id2 = await node2.id();
console.info("[node2 id]", id2.id.toJSON());
console.info("[node2 address]", id2.addresses[0].toJSON());
// connect and ping as p2p-circuit
await node2.swarm.connect(id1.addresses[0].toJSON());
await node2.libp2p.ping(`/p2p/${id2.id.toJSON()}/p2p-circuit/p2p/${id1.id.toJSON()}`);
// handler: serve inifinitely
const protocol = "/example-protocol/1.0";
{
const handler = ({connection, stream}) => {
const cStream = newClosableStream(stream);
let count = 0;
cStream.sink((async function* () {
// while (true) {
for (let i = 0; i < 20; i++) {
// infinite stream
yield new TextEncoder().encode(`count: ${++count}`);
console.log(`[Served] ${count}`);
await new Promise(f => setTimeout(f, 50));
}
})());
};
await node1.libp2p.handle(protocol, handler);
}
// dialer: accept several messages, then stop
{
const stream = newClosableStream(await node2.libp2p.dialProtocol(`/p2p/${id1.id.toJSON()}`, protocol));
stream.addEventListener("error", async ev => {
console.log(ev.detail);
});
let i = 0;
for await (const bl of stream.source) {
console.log(new TextDecoder().decode(bl.slice().slice()));
if (++i === 10) break;
}
await stream.close();
}
const sec = 45;
console.log(`[wait ${sec}sec for spawn Socket read timeout]`);
await new Promise(f => setTimeout(f, sec * 1000)); // > Socket read timeout
// reconnect after socket read timeout
console.log("[connect and ping again]");
await node2.swarm.connect(id1.addresses[0].toJSON());
await node2.libp2p.ping(`/p2p/${id2.id.toJSON()}/p2p-circuit/p2p/${id1.id.toJSON()}`);
{// dial again
const stream = newClosableStream(await node2.libp2p.dialProtocol(`/p2p/${id1.id.toJSON()}`, protocol));
stream.addEventListener("error", async ev => {
console.log(ev.detail);
});
let i = 0;
for await (const bl of stream.source) {
console.log(new TextDecoder().decode(bl.slice().slice()));
if (++i === 10) break;
}
await stream.close();
}
console.log("[stop nodes]");
await node1.stop();
await node2.stop();
console.log("[stopped]");
{
"type": "module",
"dependencies": {
"ipfs": "^0.65.0"
}
}
@bellbind
Copy link
Author

bellbind commented Nov 30, 2022

result:

$ npm i
$ node close-from-dialer.mjs
generating Ed25519 keypair...
to get started, enter:

	jsipfs cat /ipfs/QmRaaUwTNfwgFZpeUy8qrZwrp2dY4kCKmmB5xEqvH3vtD1/readme

Swarm listening on /ip4/127.0.0.1/tcp/55846/p2p/12D3KooWDWSu4Pdvit9uVKNJCXJU8n6RDy112rMnoYj1yxReQav9
Swarm listening on /ip4/192.168.10.3/tcp/55846/p2p/12D3KooWDWSu4Pdvit9uVKNJCXJU8n6RDy112rMnoYj1yxReQav9
[node1 id] 12D3KooWDWSu4Pdvit9uVKNJCXJU8n6RDy112rMnoYj1yxReQav9
[node1 address] /ip4/127.0.0.1/tcp/55846/p2p/12D3KooWDWSu4Pdvit9uVKNJCXJU8n6RDy112rMnoYj1yxReQav9
generating Ed25519 keypair...
to get started, enter:

	jsipfs cat /ipfs/QmRaaUwTNfwgFZpeUy8qrZwrp2dY4kCKmmB5xEqvH3vtD1/readme

Swarm listening on /ip4/127.0.0.1/tcp/55848/p2p/12D3KooWPJSBcFjVDynJ1eeiXQa4XRQgff5LrYyWCMWFPqDkfWdY
Swarm listening on /ip4/192.168.10.3/tcp/55848/p2p/12D3KooWPJSBcFjVDynJ1eeiXQa4XRQgff5LrYyWCMWFPqDkfWdY
[node2 id] 12D3KooWPJSBcFjVDynJ1eeiXQa4XRQgff5LrYyWCMWFPqDkfWdY
[node2 address] /ip4/127.0.0.1/tcp/55848/p2p/12D3KooWPJSBcFjVDynJ1eeiXQa4XRQgff5LrYyWCMWFPqDkfWdY
[Served] 1
count: 1
[Served] 2
count: 2
[Served] 3
count: 3
[Served] 4
count: 4
[Served] 5
count: 5
[Served] 6
count: 6
[Served] 7
count: 7
[Served] 8
count: 8
[Served] 9
count: 9
[Served] 10
count: 10
[wait 45sec for spawn Socket read timeout]
[ping again]
[Served] 1
count: 1
[Served] 2
count: 2
[Served] 3
count: 3
[Served] 4
count: 4
[Served] 5
count: 5
[Served] 6
count: 6
[Served] 7
count: 7
[Served] 8
count: 8
[Served] 9
count: 9
[Served] 10
count: 10
[stop nodes]
[stopped]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment