Skip to content

Instantly share code, notes, and snippets.

@rumkin
Last active May 6, 2020 16:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rumkin/f18860b13a041891c5af11a919b07b98 to your computer and use it in GitHub Desktop.
Save rumkin/f18860b13a041891c5af11a919b07b98 to your computer and use it in GitHub Desktop.
LibP2P Streams
import {promises as fs} from 'fs'
import Libp2p from 'libp2p'
import Mplex from 'libp2p-mplex'
import Peer from 'peer-id'
import PeerInfo from 'peer-info'
import Secio from 'libp2p-secio'
import Websockets from 'libp2p-websockets'
import pipe from 'it-pipe'
export default async function main() {
const clientPeer = await initPeer('client.json')
const serverPeer = await initPeer('server.json')
const clientInfo = new PeerInfo(clientPeer)
clientInfo.protocols.add('/chat/1.0.0')
const serverInfo = new PeerInfo(serverPeer)
serverInfo.multiaddrs.add("/ip4/127.0.0.1/tcp/8080/ws/")
const node = await Libp2p.create({
peerInfo: clientInfo,
modules: {
transport: [Websockets],
connEncryption: [Secio],
streamMuxer: [Mplex],
},
})
await node.start()
await dialChatProtocol(node, serverPeer)
await node.stop()
}
async function dialChatProtocol(node, remotePeer) {
const remote = new PeerInfo(remotePeer);
remote.multiaddrs.add("/ip4/127.0.0.1/tcp/8080/ws/");
const conn = await node.dial(remote);
const { stream } = await conn.newStream("/chat/1.0.0");
const result = await pipe(
["Hello", "There"],
stream,
async (source) => {
const chunks = [];
for await (const chunk of source) {
chunks.push(chunk);
}
return chunks;
}
);
console.log("result: ", result);
await stream.close();
await conn.close();
}
async function initPeer(filepath) {
const raw = await fs.readFile(filepath, 'utf8')
return Peer.createFromJSON(JSON.parse(raw))
}
import {promises as fs} from 'fs'
import {once} from 'events'
import Libp2p from 'libp2p'
import Mplex from 'libp2p-mplex'
import Peer from 'peer-id'
import PeerInfo from 'peer-info'
import Secio from 'libp2p-secio'
import Websockets from 'libp2p-websockets'
import pipe from 'it-pipe'
async function initPeer(peerJson = 'peer.json') {
const raw = await fs.readFile(peerJson, 'utf8')
return Peer.createFromJSON(JSON.parse(raw))
}
export default async function main() {
const serverPeer = await initPeer('server.json')
console.log(
'Peer id', serverPeer.toJSON().id
)
const serverInfo = new PeerInfo(serverPeer)
serverInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/8080/ws')
serverInfo.protocols.add('/chat/1.0.0')
const node = await Libp2p.create({
peerInfo: serverInfo,
modules: {
transport: [Websockets],
connEncryption: [Secio],
streamMuxer: [Mplex],
},
})
node.handle('/chat/1.0.0', ({protocol, stream}) => {
console.log(protocol)
handleChatStream(stream)
.catch(error => {
console.error('%s [error]:', protocol, error)
})
})
await node.start()
await once(process, 'SIGINT')
await node.stop()
console.log('Bye!')
}
async function handleChatStream(stream) {
for await (const chunk of pipe(
stream,
fromBufferList,
toString(),
)) {
console.log(chunk)
}
console.log('DONE')
}
async function * fromBufferList(source) {
for await (const list of source) {
for (const item of list._bufs) {
yield item
}
}
}
function toString(encoding = 'utf8') {
return async function * toString(source) {
const decoder = new TextDecoder(encoding)
for await (const chunk of source) {
const string = decoder.decode(chunk, {
stream: true,
})
if (string.length) {
yield string
}
}
yield decoder.decode()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment