Skip to content

Instantly share code, notes, and snippets.

@olekukonko
Forked from calebdoxsey/example.go
Created February 1, 2014 17:13
Show Gist options
  • Save olekukonko/8755356 to your computer and use it in GitHub Desktop.
Save olekukonko/8755356 to your computer and use it in GitHub Desktop.
package main
import (
"code.google.com/p/go.net/websocket"
"net/http"
"log"
)
// all code is in an init function to avoid any name collisions
func init() {
type (
Message struct {
Topic string `json:"topic"`
From uint32 `json:"from"`
To uint32 `json:"to"`
Type string `json:"type"`
Data interface{} `json:"data"`
}
)
var (
topics = make(map[string]map[uint32]chan Message)
in = make(chan Message)
)
send := func(out chan Message, msg Message) {
// drop messages when we're full
select {
case out<-msg:
default:
}
}
subscribe := func(topicId string, userId uint32, out chan Message) {
topic, ok := topics[topicId]
// create a topic if it doesn't exist
if !ok {
topic = make(map[uint32]chan Message)
topics[topicId] = topic
}
// add the user to the topic
topic[userId] = out
// let the other subscribers know the new user just subscribed
for id, c := range topic {
if id != userId {
send(c, Message{
Topic: topicId,
From: userId,
To: id,
Type: "SUBSCRIBED",
})
}
}
}
unsubscribe := func(topicId string, userId uint32) {
topic, ok := topics[topicId]
// just give up if the topic doesn't exist
if !ok {
return
}
_, ok = topic[userId]
// also give up if the user isn't already subscribed to this topic
if !ok {
return
}
// delete the user from the topic
delete(topic, userId)
// let the other subscribers know the user unsubscribed
for id, c := range topic {
send(c, Message{
Topic: topicId,
From: userId,
To: id,
Type: "UNSUBSCRIBED",
})
}
// delete the topic if it's empty
if len(topic) == 0 {
delete(topics, topicId)
}
}
// all incoming messages are handled by this go routine (keeps us from needing
// a lock)
go func() {
for msg := range in {
switch msg.Type {
// drop ID messages
case "ID":
case "SUBSCRIBE":
subscribe(msg.Topic, msg.From, msg.Data.(chan Message))
case "UNSUBSCRIBE":
unsubscribe(msg.Topic, msg.From)
// everything else we forward along
default:
topic, ok := topics[msg.Topic]
if !ok {
continue
}
c, ok := topic[msg.To]
if ok {
send(c, msg)
}
}
}
}()
http.Handle("/channel", websocket.Server{
// make it so we can connect from any domain
Config: websocket.Config{
Header: http.Header(map[string][]string{
"Access-Control-Allow-Origin": []string{"*"},
}),
},
Handler: func(ws *websocket.Conn) {
// generate a new id (using a full cycle PRNG
// http://godoc.org/github.com/cznic/mathutil#NewFC32
// )
id := <-idGenerator
log.Println(id, "connected")
// keep track of our existing subscriptions so we can unsubcribe
subscriptions := []string{}
// this is our output channel which the main goroutine will send messages
// to. We buffer up to 5 messages and start dropping if the client can't
// keep up.
out := make(chan Message, 5)
// on disconnect we unsubscribe from any subscriptions
defer func() {
for _, topicId := range subscriptions {
in <- Message{
Topic: topicId,
From: id,
Type: "UNSUBSCRIBE",
}
}
close(out)
ws.Close()
log.Println(id, "disconnected")
}()
// send the user the id (we want this to be first which is why we send it
// here)
out <- Message{
Type: "ID",
To: id,
}
// fire up a goroutine which will send all the messages
go func() {
for msg := range out {
websocket.JSON.Send(ws, msg)
}
}()
// start reading messages from the client
for {
var msg Message
err := websocket.JSON.Receive(ws, &msg)
if err != nil {
break
}
// overwrite the from id
msg.From = id
// for subscriptions we add it to our list of subscriptions and make the
// Data be the out channel
if msg.Type == "SUBSCRIBE" {
subscriptions = append(subscriptions, msg.Topic)
msg.Data = out
}
// and send it along to the main goroutine
in <- msg
}
},
});
}
RTCPeerConnection = window.RTCPeerConnection || window.webkitRTCPeerConnection || window.mozRTCPeerConnection;
RTCIceCandidate = window.RTCIceCandidate || window.mozRTCIceCandidate;
var chatLog = document.getElementById("chat-log");
var chatForm = document.getElementById("chat-form");
var chatHistory = document.getElementById("chat-history");
var chatInput = document.getElementById("chat-input");
var topic, ws, pc, dc, from, to, candidates = [];
function trace(msg) {
chatLog.textContent += msg + "\n";
}
// connect to the signaling server
function startWebSocket() {
ws = new WebSocket("ws://api.badgerodon.com:9000/channel");
ws.onopen = function() {
trace(" web socket | opened");
ws.send(JSON.stringify({
topic: topic,
type: "SUBSCRIBE"
}));
};
ws.onclose = function() {
trace("web socket | closed");
};
ws.onmessage = function(evt) {
var msg = JSON.parse(evt.data);
switch (msg.type) {
// (1) the server tells us our id
case "ID":
trace(" web socket | your id: " + msg.to);
from = msg.to;
break;
// (2a) the first user to show up will be told when the second user shows up
case "SUBSCRIBED":
trace(" web socket | joined: " + msg.from);
to = msg.from;
// start the connection
startPeerConnection();
// send an offer to the other user
sendOffer();
break;
case "UNSUBSCRIBED":
trace(" web socket | left: " + msg.from);
break;
// (2b) the second user receives the first user's offer
case "OFFER":
trace(" web socket | offer from: " + msg.from + ", " + JSON.stringify(msg.data));
to = msg.from;
// start the connection
startPeerConnection();
// accept the offer
setRemoteDescription(msg.data);
// send an answer to the first user
sendAnswer();
break;
// (3a) the first user receives the answer from the second user
case "ANSWER":
trace(" web socket | answer from: " + msg.from + ", " + JSON.stringify(msg.data));
setRemoteDescription(msg.data);
break;
// (4) both users receive ICE candidates from each other
case "CANDIDATE":
trace(" web socket | candidate from: " + msg.from + ", " + JSON.stringify(msg.data));
addCandidates(msg.data);
break;
}
};
}
// connect to a peer
function startPeerConnection() {
if (dc) {
dc.close();
dc = null;
}
if (pc) {
pc.close();
pc = null;
}
pc = new RTCPeerConnection({
iceServers: [{
// stun allows NAT traversal
url: "stun:stun.l.google.com:19302"
}]
}, {
// we are going to communicate over a data channel
optional: [{
RtpDataChannels: true
}]
});
// send all ice candidates to our peer
pc.onicecandidate = function(evt) {
if (evt.candidate) {
ws.send(JSON.stringify({
topic: topic,
type: "CANDIDATE",
to: to,
data: evt.candidate
}));
}
};
// close on disconnect
pc.oniceconnectionstatechange = function(evt) {
trace("peer connection | ice connection state: " + (pc && pc.iceConnectionState));
if (pc && pc.iceConnectionState === 'disconnected') {
pc.close();
}
};
// create the data channel, use TCP
dc = pc.createDataChannel("RTCDataChannel", {
reliable: true
});
dc.onopen = function(evt) {
trace(" data channel | opened");
};
dc.onclose = function(evt) {
trace(" data channel | closed");
};
dc.onerror = function(evt) {
trace(" data channel | error: " + JSON.stringify(evt));
}
dc.onmessage = function(evt) {
var msg = JSON.parse(evt.data);
switch (msg.type) {
case "MESSAGE":
trace(" data channel | message from: " + msg.from + ", " + msg.data);
onMessage(msg);
break;
}
};
}
function sendOffer() {
pc.createOffer(function(description) {
pc.setLocalDescription(description);
ws.send(JSON.stringify({
topic: topic,
type: "OFFER",
to: to,
data: description
}));
}, null, {
optional: [],
mandatory: {
OfferToReceiveAudio: false,
OfferToReceiveVideo: false
}
});
}
function sendAnswer() {
pc.createAnswer(function(description) {
pc.setLocalDescription(description);
ws.send(JSON.stringify({
topic: topic,
type: "ANSWER",
to: to,
data: description
}));
});
}
function setRemoteDescription(description) {
pc.setRemoteDescription(new RTCSessionDescription(description));
addCandidates();
}
function addCandidates(/* 0 or more candidates */) {
candidates.push.apply(candidates, arguments);
// only actually add the candidates if the remote description has been set
if (pc.remoteDescription) {
for (var i=0; i<candidates.length; i++) {
pc.addIceCandidate(new RTCIceCandidate(candidates[i]));
}
candidates = [];
}
}
function onMessage(msg) {
chatHistory.textContent += msg.from + ": " + msg.data + "\n";
}
function onSubmit(evt) {
evt.preventDefault();
var text = chatInput.value;
chatInput.value = "";
var msg = {
type: "MESSAGE",
data: text,
from: from,
to: to
};
onMessage(msg);
dc.send(JSON.stringify(msg));
}
chatForm.addEventListener("submit", onSubmit);
function main() {
while (!topic) {
topic = prompt("Please Enter a Topic", "");
}
startWebSocket();
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment