Skip to content

Instantly share code, notes, and snippets.

@dominictarr
Last active December 18, 2015 13:59
Show Gist options
  • Save dominictarr/204cbf77033d0de1ef00 to your computer and use it in GitHub Desktop.
Save dominictarr/204cbf77033d0de1ef00 to your computer and use it in GitHub Desktop.
dist
var net = require('net')
var serialize = require('stream-serializer')()
var through = require('message-stream')
var opts = require('optimist').argv
var EventEmitter = require('events').EventEmitter
var port = 8989
var myIp = require('my-local-ip')()
var chat = new EventEmitter()
chat.message = function (mes) {
chat.emit('message',
{ip: myIp, message: mes, name: opts.name, ts: Date.now()})
}
chat.on('message', function (data, id) {
console.log(data)
})
process.stdin.on('data', function (d) {
chat.message(d.toString())
})
chat.createStream = function () {
var ts = through(function (data) {
chat.emit('message', data, ts)
})
//reemit all data.
chat.on('message', function (data, source) {
if(source != ts) ts.queue(data)
})
return ts
}
if(opts.server) {
net.createServer(function (stream) {
console.log("connect:", stream.remoteAddress)
stream.pipe(chat.createStream()).pipe(stream)
}).listen(port, myIp)
} else {
var stream = net.connect(port, myIp)
stream.pipe(chat.createStream()).pipe(stream)
stream.on('close', function () {
console.log('Disconnected!')
process.exit(1)
})
}
var net = require('net')
var serialize = require('stream-serializer')()
var through = require('message-stream')
var opts = require('optimist').argv
var EventEmitter = require('events').EventEmitter
var port = 8989
var myIp = require('my-local-ip')()
var chat = new EventEmitter()
chat.message = function (mes) {
chat.emit('message',
{ip: myIp, message: mes, name: opts.name, ts: Date.now()})
}
chat.on('message', function (data, id) {
console.log(data)
})
process.stdin.on('data', function (d) {
chat.message(d.toString())
})
chat.createStream = function () {
var ts = through(function (data) {
chat.emit('message', data, ts)
})
//reemit all data.
chat.on('message', function (data, source) {
if(source != ts) ts.queue(data)
})
return ts
}
if(opts.server) {
net.createServer(function (stream) {
console.log("connect:", stream.remoteAddress)
stream.pipe(chat.createStream()).pipe(stream)
}).listen(port, myIp)
} else {
var stream, ts
function connect () {
stream = net.connect(port, myIp, {allowHalfDuplex: false})
//add this bit when we want to print a connected message.
stream.pipe(chat.createStream()).pipe(stream)
stream.on('close', reconnect)
stream.on('error', reconnect)
}
function reconnect () {
stream.removeAllListeners()
console.log('reconnecting...')
setTimeout(connect, 1000 + Math.random()*1000)
}
connect()
}
var net = require('net')
var through = require('message-stream')
var opts = require('optimist').argv
var EventEmitter = require('events').EventEmitter
var port = 8989
var myIp = require('my-local-ip')()
var messages = []
var chat = new EventEmitter()
function latest () {
var last = 0
messages.forEach(function (m) {
if(last < m.ts)
last = m.ts
})
return last
}
chat.message = function (mes) {
chat.emit('message', {
ip: myIp,
message: mes,
name: opts.name,
ts: Date.now()
}, true)
}
chat.on('message', function (data, local) {
messages.push(data)
console.log(data)
})
process.stdin.on('data', function (d) {
chat.message(d.toString())
})
chat.createStream = function () {
var ts = through(function (data) {
if(null != data.since) {
messages.filter(function (e) {
if(e.ts > data.since)
ts.queue(e)
})
} else
chat.emit('message', data, ts)
})
//reemit all data.
chat.on('message', function (data, source) {
if(source != ts) ts.queue(data)
})
ts.queue({since: latest()})
return ts
}
if(opts.server) {
net.createServer(function (stream) {
console.log("connect:", stream.remoteAddress)
stream.on('data', function (e) {
console.log('e', ''+e)
})
stream.pipe(chat.createStream()).pipe(stream)
}).listen(port, myIp)
} else {
var stream, ts
function connect () {
stream = net.connect(port, myIp, {allowHalfDuplex: false})
//add this bit when we want to print a connected message.
stream.pipe(chat.createStream()).pipe(stream)
stream.on('close', reconnect)
stream.on('error', reconnect)
}
function reconnect () {
stream.removeAllListeners()
console.log('reconnecting...')
setTimeout(connect, 1000 + Math.random()*1000)
}
connect()
}
var net = require('net')
var MessageStream = require('message-stream')
var opts = require('optimist').argv
var EventEmitter = require('events').EventEmitter
var port = 8989
var myIp = require('my-local-ip')()
var messages = []
function latest () {
var last = 0
messages.forEach(function (m) {
if(last < m.ts)
last = m.ts
})
return last
}
function randomIp () {
var ips = {}
messages.forEach(function (e) {
ips[e.ip] = true
})
ips = Object.keys(ips)//.filter(function (e) { return e != myIp })
return ips[~~(ips.length*Math.random())]
}
function getKnown () {
var known = {}
messages.forEach(function (m) {
if(!known[m.ip] || known[m.ip] < m.ts)
known[m.ip] = m.ts
})
return known
}
var chat = new EventEmitter()
chat.message = function (mes) {
chat.emit('message', {
ip: myIp,
message: mes,
name: opts.name,
ts: Date.now()
}, true)
}
chat.on('message', function (m) {
messages.push(m)
console.log(m)
})
process.stdin.on('data', function (d) {
chat.message(d.toString())
})
chat.createStream = function () {
var known = {}
var ms = MessageStream(function (data) {
console.log('>>', data)
if(data.vector != null) {
messages.forEach(function (e) {
known = data.vector
data.vector[e.ip]
if(!known || known < e.ts)
ts.queue(e)
})
} else if(null != data.since) {
//THIS IS DEPRECIATED
messages.filter(function (e) {
if(e.ts > data.since)
ts.queue(e)
})
} else
chat.emit('message', data, ms)
})
chat.on('message', function (data, source) {
if(source != ms)
ms.queue(data)
})
ms.queue({vector: getKnown()})
return ms
}
if(opts.server) {
//server
net.createServer(function (stream) {
console.log("connect:", stream.remoteAddress)
stream.pipe(chat.createStream()).pipe(stream)
}).listen(port, myIp)
.on('error', function (e) {
if(e.code === 'EADDRINUSE')
console.error(e)
else
throw e
})
} else {
//client
;(function connect () {
var stream = net.connect(port, randomIp() || opts.seed)
//add this bit when we want to print a connected message.
stream.on('connect', function () {
console.log('connected!')
stream.pipe(chat.createStream()).pipe(stream)
})
stream.on('close', reconnect)
stream.on('error', reconnect)
var timer = setTimeout(function (e) {
stream.end()
}, 3000 + Math.random() * 1000)
function reconnect () {
stream.removeAllListeners()
clearTimeout(timer)
connect()
}
})()
}

distributed

travis

testling

License

MIT

{
"name": "distributed",
"description": "",
"version": "0.0.0",
"homepage": "https://github.com/dominictarr/distributed",
"repository": {
"type": "git",
"url": "git://github.com/dominictarr/distributed.git"
},
"dependencies": {
"my-local-ip": "~1.0.0",
"optimist": "~0.5.2",
"message-stream": "~1.0.0"
},
"devDependencies": {},
"scripts": {
"test": "set -e; for t in test/*.js; do node $t; done"
},
"author": "Dominic Tarr <dominic.tarr@gmail.com> (http://dominictarr.com)",
"license": "MIT"
}
@Raynos
Copy link

Raynos commented Jun 17, 2013

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment