Skip to content

Instantly share code, notes, and snippets.

@Evangenieur
Last active December 22, 2015 08:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Evangenieur/6443661 to your computer and use it in GitHub Desktop.
Save Evangenieur/6443661 to your computer and use it in GitHub Desktop.
All-In-One-File : P2P (server 2 server) Cluster formation using multicast DNS, sharing documents with web clients and servers
browserify = require 'browserify-middleware'
mdns = require "mdns"
os = require "os"
util = require "util"
_ = require "underscore"
request = require "request"
require "colors"
PORT = parseInt(process.argv[2]) or 3001
require("zappajs") PORT, ->
peers = {}
me = null
@server.on "error", (err) =>
if err.code is "EADDRINUSE"
@server.listen ++PORT
@server.on "listening", =>
net_interfaces = _(os.networkInterfaces()).reduce (memo, props, net_interface) ->
memo[net_interface] = (prop.address for prop in props when prop.family is "IPv4")[0]
memo
, {}
console.log "Server Listening on port #{PORT.toString().green}".yellow,
util.inspect net_interfaces, colors: true
p2p_node = mdns.tcp("p2p-server-node")
do advert = ->
ad = mdns.createAdvertisement p2p_node, PORT,
txtRecord:
name: "p2p-server-node"
ad.start()
do discover = ->
browser = mdns.createBrowser p2p_node
browser.on "serviceUp", (service) ->
matching_ips = _(service.addresses).intersection _(net_interfaces).values()
host = service.addresses[0] + ":" + service.port
# escape if it's myself
if service.port is PORT and matching_ips.length
console.log "me", service
doc.set "server_"+host.replace(/[.:]/g, "_"),
type: "server"
host: host
pid: process.pid
return me = host
#return if service.port > PORT
console.log "serviceUp #{host.yellow}".green
do addPeer = ->
peers[host] = do ->
console.log "http://#{host}/_replicate", doc
r = request.put "http://#{host}/_replicate"
r.pipe( doc.createStream() ).pipe r
r.on "end", ->
console.log "connection closed with #{host}".red
delete peers[host]
browser.on "serviceDown", (service) ->
console.log "serviceDown".red #, service
browser.start()
@io.set "log level", 0
@use static: __dirname
### Client & Server Side ####
@shared "/shared.js": ->
root = if window? then window else global
root.crdt = require "crdt"
duplex = require "duplex"
class SocketIOStreams
constructor: (@socket) ->
@channels = {}
createStreamOnChannel: (channel) ->
@socket.on channel, send_to_channel = (data) =>
@channels[ channel ].emit "data", data
@channels[ channel ] = duplex(
_write = (data) =>
@socket.emit channel, data
_end = =>
@socket.removeListener channel, send_to_channel
)
root.init_crdt_streams_over_socket_io = (socket, doc) ->
ds = doc.createStream()
sio_streams = new SocketIOStreams(socket)
sio_chan = sio_streams.createStreamOnChannel("doc")
ds.pipe(sio_chan).pipe(ds)
### Server Side ###
doc = new crdt.Doc()
setInterval ->
console.log (id for id, row of doc.rows)
, 5000
### Client Side ###
@client "/main.js": ->
add_row = (row) ->
$("#container").append """
<div id="row_#{row.id}">
#{JSON.stringify row.state}
</div>
"""
update_row = (row) ->
$("#row_#{row.id}").text JSON.stringify row.state
remove_row = (row) ->
setTimeout ->
$("#row_#{row.id}").remove()
, 100
@connect()
doc_stream = null
@on connect: ->
console.log "connected", id = @socket.socket.sessionid
doc = new crdt.Doc()
doc.on "add", add_row
doc.on "row_update", (row) ->
if $("#row_#{row.id}").length
update_row row
else
add_row row
doc.on "remove", (row) ->
remove_row row
doc.set "client_#{id}", type: "client", ua: navigator.userAgent
doc_stream = init_crdt_streams_over_socket_io(@socket, doc)
@on disconnect: ->
console.log "disconnected", id = @socket.socket.sessionid
console.log doc_stream
doc_stream.end()
doc.removeAllListeners()
doc.rm "client_#{id}"
$("#container").empty()
### Server Side ###
# Web sockets
@on connection: ->
console.log "connected", @id
init_crdt_streams_over_socket_io(@socket, doc)
@on disconnect: ->
console.log "disconnected #{@id}"
doc.rm "client_#{@id}"
# HTTP routes
@get "/": ->
@render "index"
@get "/bundle.js": browserify ["crdt", "duplex"]
@put "/_replicate": ->
console.log "/_replicate".yellow
@req.pipe( doc.createStream() ).pipe @res
@view "index": ->
html ->
head ->
script src: "/zappa/Zappa-simple.js"
script src: "/bundle.js"
script src: "/shared.js"
script src: "/main.js"
body ->
div "#container", ""

P2P Server Cluster PoC

Cluster formation using multicast DNS, sharing documents with web clients and servers through crdt / scuttlebutt / WebSocket ( socket.io ) / server 2 server HTTP stream

All-in-one-file

for fun and beauty of ZappaJS / CoffeeScript / Node.js

Install Dependencies

Assuming you have previously installed node.js / npm

sudo npm install -g coffee-script
npm install browserify-middleware mdns underscore request colors zappajs crdt duplex

Run multiple instances on a multicast network (LAN)

command line (if port not provided, it will start from 3001..) :

coffee p2p_node.coffee [PORT]

ex :

coffee p2p_node.coffee 3001
coffee p2p_node.coffee 3002

Each node will discover others and share the doc

Web Fun

Open your browser and connect to each node using the url format :

 http://HOST:PORT/

Open your browser console, and try to manipulate the data, for example :

On one node :

var row = doc.get("new_id"); var i = 0; var timer = setInterval(function(){row.set("test", i++)}, 500)

On the other :

var row = doc.get("new_id"); var i = 0; var timer = setInterval(function(){row.set("test2", i++)}, 100)

And see the beauty of distributed architecture :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment