Created
March 7, 2017 02:08
-
-
Save notedit/0e6d97272d46a7080fd6ea10a45f7a59 to your computer and use it in GitHub Desktop.
go janus exmple
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"net/http" | |
"time" | |
"./janus" | |
log "github.com/Sirupsen/logrus" | |
"github.com/bitly/go-simplejson" | |
websocket "github.com/kataras/go-websocket" | |
) | |
type Message struct { | |
Session uint64 `json:"session,omitempty"` | |
Handle uint64 `json:"handle,omitempty"` | |
Type string `json:"type,omitempty"` | |
Room uint64 `json:"room,omitempty"` | |
Connection string `json:"connection,omitempty"` | |
Data map[string]interface{} `json:"data,omitempty"` | |
} | |
// room is 1234 | |
func handleWebsocketConnection(c websocket.Connection) { | |
log.Debug("handleWebsocketConnection ") | |
session, err := gateway.Create() | |
if err != nil { | |
log.Error("gateway.Create ", err) | |
return | |
} | |
stop := make(chan struct{}) | |
go keeplive(session, stop) | |
c.OnMessage(func(msg []byte) { | |
js, err := simplejson.NewJson(msg) | |
if err != nil { | |
log.Error("websocekt data parse error ", err) | |
return | |
} | |
etype, err := js.Get("type").String() | |
if err != nil { | |
return | |
} | |
data, _ := js.CheckGet("data") | |
if etype == "attach" { | |
userID, _ := data.Get("userid").Uint64() | |
//room, _ := data.Get("room").Uint64() | |
//role, _ := data.Get("role").String() | |
privateID, _ := data.Get("private_id").Uint64() | |
handle, err := session.Attach("janus.plugin.videoroom") | |
if err != nil { | |
log.Error("session.Attach ", err) | |
return // todo need notify client this error | |
} | |
go watch(session, handle, c, stop) | |
log.Debug("attach message ", data) | |
// then join | |
message := map[string]interface{}{ | |
"request": "join", | |
"room": 1234, | |
"feed": userID, | |
"ptype": "listener", | |
"private_id": privateID, | |
} | |
eventMsg, err := handle.Message(message, nil) | |
if err != nil { | |
log.Error("handle.Message ", err) | |
return | |
} | |
event := eventMsg.Plugindata.Data["videoroom"] | |
if event == nil { | |
return | |
} | |
event = event.(string) | |
if event != "attached" { | |
log.Error("attached event expected ") | |
return | |
} | |
replyMess := &Message{ | |
Session: session.Id, | |
Handle: handle.Id, | |
Type: "attached", | |
Data: map[string]interface{}{ | |
"userid": userID, | |
"sdp": eventMsg.Jsep, | |
}, | |
} | |
mess, _ := json.Marshal(replyMess) | |
c.EmitMessage(mess) | |
log.Debug("send attached message ", replyMess) | |
return | |
} | |
// we should create a handle | |
if etype == "join" { | |
room, _ := data.Get("room").Uint64() | |
userID, _ := data.Get("userid").Uint64() | |
role, _ := data.Get("role").String() | |
log.Debug("join room ", room, " userid ", userID, " connection ", c.ID()) | |
// we touch before we join | |
handle, err := session.Attach("janus.plugin.videoroom") | |
if err != nil { | |
log.Error("session.Attach ", err) | |
return // todo need notify client this error | |
} | |
log.Debug("handle attached ", handle.Id) | |
go watch(session, handle, c, stop) | |
message := map[string]interface{}{ | |
"request": "join", | |
"room": 1234, | |
"id": userID, | |
"ptype": role, | |
"display": " ", | |
} | |
eventMsg, err := handle.Message(message, nil) | |
if err != nil { | |
log.Error("handle.Message ", err) | |
} | |
log.Debug("joined from plugin", eventMsg.Plugindata.Plugin) | |
event := eventMsg.Plugindata.Data["videoroom"] | |
if event == nil { | |
return | |
} | |
event = event.(string) | |
if event == "joined" { | |
replyMess := &Message{ | |
Session: session.Id, | |
Handle: handle.Id, | |
Type: "joined", | |
Data: map[string]interface{}{ | |
"private_id": eventMsg.Plugindata.Data["private_id"], | |
"publishers": eventMsg.Plugindata.Data["publishers"], | |
}, | |
} | |
mess, _ := json.Marshal(replyMess) | |
c.EmitMessage(mess) | |
} | |
return | |
} | |
// now we get the handle | |
handleID, err := js.Get("handle").Uint64() | |
if err != nil { | |
log.Error("OnMessage can not get handleid") | |
return | |
} | |
session.Lock() | |
handle := session.Handles[handleID] | |
session.Unlock() | |
if handle == nil { | |
log.Error("can not find handle ") | |
return | |
} | |
if etype == "leave" { | |
message := map[string]interface{}{ | |
"request": "leave", | |
} | |
eventMsg, err := handle.Message(message, nil) | |
if err != nil { | |
log.Error("handle.leave error ", err) | |
} | |
log.Debug(eventMsg.Plugindata) | |
_, err = handle.Detach() | |
if err != nil { | |
log.Error("handle.Detach ", err) | |
} | |
return | |
} else if etype == "publish" { | |
//audio, _ := data.Get("media").Get("audio").Bool() | |
//video, _ := data.Get("media").Get("video").Bool() | |
sdp := data.Get("sdp").Interface() | |
message := map[string]interface{}{ | |
"request": "configure", | |
"audio": true, | |
"video": true, | |
"bitrate": 128000, | |
} | |
eventMsg, err := handle.Message(message, sdp) | |
if err != nil { | |
log.Error("handle.Message ", err) | |
} | |
log.Debug("publish ", eventMsg.Plugindata.Data, eventMsg.Jsep) | |
replyMess := &Message{ | |
Session: session.Id, | |
Handle: handle.Id, | |
Type: "published", | |
Data: map[string]interface{}{ | |
"sdp": eventMsg.Jsep, | |
}, | |
} | |
mess, _ := json.Marshal(replyMess) | |
c.EmitMessage(mess) | |
} else if etype == "unpublish" { | |
message := map[string]interface{}{ | |
"request": "unpublish", | |
} | |
_, err := handle.Message(message, nil) | |
if err != nil { | |
log.Error("handle.Message ", err) | |
} | |
} else if etype == "subcribe" { | |
sdp := data.Get("sdp").Interface() | |
message := map[string]interface{}{ | |
"request": "start", | |
"room": 1234, | |
} | |
eventMsg, err := handle.Message(message, sdp) | |
if err != nil { | |
log.Error("handle.Message ", err) | |
} | |
log.Debug("subcribe answer ", sdp) | |
log.Debug("subcribe result ", eventMsg) | |
} else if etype == "ice" { | |
candidate := data.Get("candidate").Interface() | |
eventMsg, err := handle.Trickle(candidate) | |
if err != nil { | |
log.Error("handle.Trickle ", err) | |
} | |
log.Debug("handle.Trickle ", eventMsg) | |
} | |
}) | |
c.OnDisconnect(func() { | |
// log.Debug("connection ", c.ID(), " disconnect") | |
// _, err := session.Destroy() | |
// if err != nil { | |
// log.Error("session.Destroy error ", err) | |
// } | |
log.Debug("connection ", c.ID(), " disconnect") | |
for _, value := range session.Handles { | |
value.Detach() | |
} | |
_, err := session.Destroy() | |
if err != nil { | |
log.Error("session.Destroy error ", err) | |
} | |
}) | |
message := &Message{ | |
Session: session.Id, | |
Type: "created", | |
} | |
mess, _ := json.Marshal(message) | |
c.EmitMessage(mess) | |
} | |
func watch(session *janus.Session, handle *janus.Handle, c websocket.Connection, stop chan struct{}) { | |
// 需要找到一个好的方法 退出 | |
for { | |
msg := <-handle.Events | |
switch msg := msg.(type) { | |
case *janus.SlowLinkMsg: | |
log.Debug("SlowLinkMsg type ", msg) | |
case *janus.MediaMsg: | |
log.Debug("mediaEvent type ", msg.Type, " receiving ", msg.Receiving) | |
case *janus.EventMsg: | |
log.Debug("event type ", msg.Plugindata.Data) | |
if msg.Plugindata.Data["publishers"] != nil { | |
// 有一个新的publisher 进来 | |
replyMess := &Message{ | |
Session: session.Id, | |
Handle: handle.Id, | |
Type: "publishers", | |
Data: map[string]interface{}{ | |
"publishers": msg.Plugindata.Data["publishers"], | |
}, | |
} | |
mess, _ := json.Marshal(replyMess) | |
c.EmitMessage(mess) | |
log.Debug("send publishers", replyMess, mess) | |
} else if msg.Plugindata.Data["leaving"] != nil { | |
// 有人离开了 | |
replyMess := &Message{ | |
Session: session.Id, | |
Handle: handle.Id, | |
Type: "leaving", | |
Data: map[string]interface{}{ | |
"leaving": msg.Plugindata.Data["leaving"], | |
}, | |
} | |
mess, _ := json.Marshal(replyMess) | |
c.EmitMessage(mess) | |
} else if msg.Plugindata.Data["unpublished"] != nil { | |
// does not have this for now | |
} else if msg.Plugindata.Data["error"] != nil { | |
} | |
} | |
} | |
} | |
func keeplive(session *janus.Session, stop chan struct{}) { | |
ticker := time.NewTicker(time.Second * 30) | |
for { | |
select { | |
case <-ticker.C: | |
session.KeepAlive() | |
case <-stop: | |
ticker.Stop() | |
return | |
} | |
} | |
} | |
func createSessionHandler(w http.ResponseWriter, r *http.Request) { | |
session, err := gateway.Create() | |
if err != nil { | |
log.Error("gateway.Create ", err) | |
} | |
result, _ := json.Marshal(map[string]interface{}{ | |
"session_id": session.Id, | |
}) | |
w.Header().Set("Content-Type", "application/json") | |
w.Write(result) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment