Skip to content

Instantly share code, notes, and snippets.

@daviddias
Created January 18, 2017 18:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daviddias/f6a095ce4adc87345c6eb0dae32f5ef7 to your computer and use it in GitHub Desktop.
Save daviddias/f6a095ce4adc87345c6eb0dae32f5ef7 to your computer and use it in GitHub Desktop.
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'
const chai = require('chai')
chai.use(require('chai-checkmark'))
const expect = chai.expect
const pair = require('pull-pair/duplex')
const pull = require('pull-stream')
const parallel = require('async/parallel')
const series = require('async/series')
const Tcp = require('libp2p-tcp')
const multiaddr = require('multiaddr')
const mh = multiaddr('/ip4/127.0.0.1/tcp/10000')
function closeAndWait (stream, callback) {
pull(
pull.empty(),
stream,
pull.onEnd(() => callback())
)
}
module.exports = (common) => {
describe.only('close', () => {
let muxer
beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
})
it('closing underlying socket closes streams (tcp)', (done) => {
expect(2).checks(done)
const tcp = new Tcp()
const tcpListener = tcp.createListener((conn) => {
const listener = muxer.listener(conn)
listener.on('stream', (stream) => {
pull(stream, stream)
})
})
tcpListener.listen(mh, () => {
const dialerConn = tcp.dial(mh, setTimeout(() => tcpListener.close(), 2000))
const dialerMuxer = muxer.dialer(dialerConn)
const s1 = dialerMuxer.newStream(() => {
pull(
s1,
pull.onEnd((err) => {
// console.log('s1', err)
expect(err).to.exist.mark()
})
)
})
const s2 = dialerMuxer.newStream(() => {
pull(
s2,
pull.onEnd((err) => {
// console.log('s2', err)
expect(err).to.exist.mark()
})
)
})
})
})
it('closing one of the muxed streams doesn\'t close others', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])
let counter = 0
function check () {
if (++counter === 6){
done()
}
}
const conns = []
listener.on('stream', (stream) => {
check()
pull(stream, stream)
})
for (let i = 0; i < 5; i++) {
conns.push(dialer.newStream())
}
conns.forEach((conn, i) => {
if (i === 1) {
closeAndWait(conn, (err) => {
expect(err).to.not.exist
console.log('Happen once')
check()
})
} else {
pull(
conn,
pull.onEnd(() => {
throw new Error('should not end')
})
)
}
})
})
it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => {
const p = pair()
const dialer = muxer.dial(p[0])
const listener = muxer.listen(p[1])
expect(15).checks(done)
const conns = []
const count = []
for (let i = 0; i < 5; i++) {
count.push(i)
}
series(count.map((i) => (cb) => {
parallel([
(cb) => listener.once('stream', (stream) => {
console.log('pipe')
expect(stream).to.exist.mark()
pull(stream, stream)
cb()
}),
(cb) => conns.push(dialer.newStream(cb))
], cb)
}), (err) => {
if (err) return done(err)
conns.forEach((conn, i) => {
pull(
pull.values([Buffer('hello')]),
pull.asyncMap((val, cb) => {
setTimeout(() => {
cb(null, val)
}, i * 10)
}),
pull.through((val) => console.log('send', val)),
conn,
pull.through((val) => console.log('recv', val)),
pull.collect((err, data) => {
console.log('end', i)
expect(err).to.not.exist.mark()
expect(data).to.be.eql([Buffer('hello')]).mark()
})
)
})
listener.on('close', () => {
console.log('closed listener')
})
dialer.end(() => {
console.log('CLOSED')
})
})
})
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment