Skip to content

Instantly share code, notes, and snippets.

@Raynos
Created August 4, 2012 01:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Raynos/3253232 to your computer and use it in GitHub Desktop.
Save Raynos/3253232 to your computer and use it in GitHub Desktop.
module.exports = require("mux-demux-shoe")
var shoe = require("mux-demux-shoe")
, through = require("through")
, multiStreamRegExp = /(multi-stream-)([\w\W]*)/
, streams = {}
module.exports = createShoeConnection
function createShoeConnection(callback) {
var sock = shoe(proxyConnections)
return sock
function proxyConnections(stream) {
var meta = stream.meta
if ("string" !== typeof meta) {
return callback(stream)
}
var regExpResult = meta.match(multiStreamRegExp)
if (regExpResult === null) {
return callback(stream)
}
stream.on("error", function () {
/* mdm should not error when disconnect */
})
var name = regExpResult[2]
, communicationStream = streams[name]
if (!communicationStream) {
communicationStream = streams[name] = through()
}
communicationStream.pipe(stream).pipe(communicationStream, {
end: false
})
}
}
var shoe = require("shoe")
, MuxDemux = require("mux-demux")
, curry = require("ap").curry
, DuplexStream = require("duplex")
, reemit = curry(function (stream, message) {
stream.emit(message)
})
module.exports = createOuterStream
function createOuterStream(uri, reconnect) {
var mdm = MuxDemux()
createShoeStream(uri, reconnect, mdm)
if (reconnect) {
var _createStream = mdm.createStream
mdm.createStream = createStream
}
return mdm
function createStream(meta) {
var duplex = DuplexStream(write, end)
, stream = createMdmStream(meta, duplex)
mdm.on("disconnect", function () {
duplex.pause()
})
mdm.on("connect", function () {
createMdmStream(meta, duplex)
duplex.resume()
})
return duplex
function write(data) {
this.sendData(data)
}
function end() {
this.sendEnd()
}
function createMdmStream(meta, duplex) {
stream && stream.destroy()
stream = _createStream(meta, duplex)
duplex.pipe(stream).pipe(duplex, {
end: false
})
}
}
}
function createShoeStream(uri, reconnect, mdm) {
var stream = shoe(uri)
, emitConnect = reemit(mdm, "connect")
stream.on("connect", emitConnect)
mdm.pipe(stream).pipe(mdm, {
end: false
})
if (reconnect) {
stream.on("end", onend)
}
return mdm
function onend() {
stream.removeListener("end", onend)
stream.removeListener("connect", emitConnect)
mdm.emit("disconnect")
createShoeStream(uri, reconnect, mdm)
}
}
var shoe = require("shoe")
, MuxDemux = require("mux-demux")
module.exports = createShoeConnection
function createShoeConnection(options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
return shoe(options, interceptStream)
function interceptStream(stream) {
var mdm = MuxDemux()
mdm.on("connection", callback)
stream.pipe(mdm).pipe(stream)
stream.on("error", function (err) {
/* ignore mdm errors */
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment