Skip to content

Instantly share code, notes, and snippets.

@notedit
Created March 7, 2017 02:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save notedit/0e6d97272d46a7080fd6ea10a45f7a59 to your computer and use it in GitHub Desktop.
Save notedit/0e6d97272d46a7080fd6ea10a45f7a59 to your computer and use it in GitHub Desktop.
go janus exmple
package main
import (
"encoding/json"
"net/http"
"time"
"./janus"
log "github.com/Sirupsen/logrus"
"github.com/bitly/go-simplejson"
websocket "github.com/kataras/go-websocket"
)
type Message struct {
Session uint64 `json:"session,omitempty"`
Handle uint64 `json:"handle,omitempty"`
Type string `json:"type,omitempty"`
Room uint64 `json:"room,omitempty"`
Connection string `json:"connection,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
}
// room is 1234
func handleWebsocketConnection(c websocket.Connection) {
log.Debug("handleWebsocketConnection ")
session, err := gateway.Create()
if err != nil {
log.Error("gateway.Create ", err)
return
}
stop := make(chan struct{})
go keeplive(session, stop)
c.OnMessage(func(msg []byte) {
js, err := simplejson.NewJson(msg)
if err != nil {
log.Error("websocekt data parse error ", err)
return
}
etype, err := js.Get("type").String()
if err != nil {
return
}
data, _ := js.CheckGet("data")
if etype == "attach" {
userID, _ := data.Get("userid").Uint64()
//room, _ := data.Get("room").Uint64()
//role, _ := data.Get("role").String()
privateID, _ := data.Get("private_id").Uint64()
handle, err := session.Attach("janus.plugin.videoroom")
if err != nil {
log.Error("session.Attach ", err)
return // todo need notify client this error
}
go watch(session, handle, c, stop)
log.Debug("attach message ", data)
// then join
message := map[string]interface{}{
"request": "join",
"room": 1234,
"feed": userID,
"ptype": "listener",
"private_id": privateID,
}
eventMsg, err := handle.Message(message, nil)
if err != nil {
log.Error("handle.Message ", err)
return
}
event := eventMsg.Plugindata.Data["videoroom"]
if event == nil {
return
}
event = event.(string)
if event != "attached" {
log.Error("attached event expected ")
return
}
replyMess := &Message{
Session: session.Id,
Handle: handle.Id,
Type: "attached",
Data: map[string]interface{}{
"userid": userID,
"sdp": eventMsg.Jsep,
},
}
mess, _ := json.Marshal(replyMess)
c.EmitMessage(mess)
log.Debug("send attached message ", replyMess)
return
}
// we should create a handle
if etype == "join" {
room, _ := data.Get("room").Uint64()
userID, _ := data.Get("userid").Uint64()
role, _ := data.Get("role").String()
log.Debug("join room ", room, " userid ", userID, " connection ", c.ID())
// we touch before we join
handle, err := session.Attach("janus.plugin.videoroom")
if err != nil {
log.Error("session.Attach ", err)
return // todo need notify client this error
}
log.Debug("handle attached ", handle.Id)
go watch(session, handle, c, stop)
message := map[string]interface{}{
"request": "join",
"room": 1234,
"id": userID,
"ptype": role,
"display": " ",
}
eventMsg, err := handle.Message(message, nil)
if err != nil {
log.Error("handle.Message ", err)
}
log.Debug("joined from plugin", eventMsg.Plugindata.Plugin)
event := eventMsg.Plugindata.Data["videoroom"]
if event == nil {
return
}
event = event.(string)
if event == "joined" {
replyMess := &Message{
Session: session.Id,
Handle: handle.Id,
Type: "joined",
Data: map[string]interface{}{
"private_id": eventMsg.Plugindata.Data["private_id"],
"publishers": eventMsg.Plugindata.Data["publishers"],
},
}
mess, _ := json.Marshal(replyMess)
c.EmitMessage(mess)
}
return
}
// now we get the handle
handleID, err := js.Get("handle").Uint64()
if err != nil {
log.Error("OnMessage can not get handleid")
return
}
session.Lock()
handle := session.Handles[handleID]
session.Unlock()
if handle == nil {
log.Error("can not find handle ")
return
}
if etype == "leave" {
message := map[string]interface{}{
"request": "leave",
}
eventMsg, err := handle.Message(message, nil)
if err != nil {
log.Error("handle.leave error ", err)
}
log.Debug(eventMsg.Plugindata)
_, err = handle.Detach()
if err != nil {
log.Error("handle.Detach ", err)
}
return
} else if etype == "publish" {
//audio, _ := data.Get("media").Get("audio").Bool()
//video, _ := data.Get("media").Get("video").Bool()
sdp := data.Get("sdp").Interface()
message := map[string]interface{}{
"request": "configure",
"audio": true,
"video": true,
"bitrate": 128000,
}
eventMsg, err := handle.Message(message, sdp)
if err != nil {
log.Error("handle.Message ", err)
}
log.Debug("publish ", eventMsg.Plugindata.Data, eventMsg.Jsep)
replyMess := &Message{
Session: session.Id,
Handle: handle.Id,
Type: "published",
Data: map[string]interface{}{
"sdp": eventMsg.Jsep,
},
}
mess, _ := json.Marshal(replyMess)
c.EmitMessage(mess)
} else if etype == "unpublish" {
message := map[string]interface{}{
"request": "unpublish",
}
_, err := handle.Message(message, nil)
if err != nil {
log.Error("handle.Message ", err)
}
} else if etype == "subcribe" {
sdp := data.Get("sdp").Interface()
message := map[string]interface{}{
"request": "start",
"room": 1234,
}
eventMsg, err := handle.Message(message, sdp)
if err != nil {
log.Error("handle.Message ", err)
}
log.Debug("subcribe answer ", sdp)
log.Debug("subcribe result ", eventMsg)
} else if etype == "ice" {
candidate := data.Get("candidate").Interface()
eventMsg, err := handle.Trickle(candidate)
if err != nil {
log.Error("handle.Trickle ", err)
}
log.Debug("handle.Trickle ", eventMsg)
}
})
c.OnDisconnect(func() {
// log.Debug("connection ", c.ID(), " disconnect")
// _, err := session.Destroy()
// if err != nil {
// log.Error("session.Destroy error ", err)
// }
log.Debug("connection ", c.ID(), " disconnect")
for _, value := range session.Handles {
value.Detach()
}
_, err := session.Destroy()
if err != nil {
log.Error("session.Destroy error ", err)
}
})
message := &Message{
Session: session.Id,
Type: "created",
}
mess, _ := json.Marshal(message)
c.EmitMessage(mess)
}
func watch(session *janus.Session, handle *janus.Handle, c websocket.Connection, stop chan struct{}) {
// 需要找到一个好的方法 退出
for {
msg := <-handle.Events
switch msg := msg.(type) {
case *janus.SlowLinkMsg:
log.Debug("SlowLinkMsg type ", msg)
case *janus.MediaMsg:
log.Debug("mediaEvent type ", msg.Type, " receiving ", msg.Receiving)
case *janus.EventMsg:
log.Debug("event type ", msg.Plugindata.Data)
if msg.Plugindata.Data["publishers"] != nil {
// 有一个新的publisher 进来
replyMess := &Message{
Session: session.Id,
Handle: handle.Id,
Type: "publishers",
Data: map[string]interface{}{
"publishers": msg.Plugindata.Data["publishers"],
},
}
mess, _ := json.Marshal(replyMess)
c.EmitMessage(mess)
log.Debug("send publishers", replyMess, mess)
} else if msg.Plugindata.Data["leaving"] != nil {
// 有人离开了
replyMess := &Message{
Session: session.Id,
Handle: handle.Id,
Type: "leaving",
Data: map[string]interface{}{
"leaving": msg.Plugindata.Data["leaving"],
},
}
mess, _ := json.Marshal(replyMess)
c.EmitMessage(mess)
} else if msg.Plugindata.Data["unpublished"] != nil {
// does not have this for now
} else if msg.Plugindata.Data["error"] != nil {
}
}
}
}
func keeplive(session *janus.Session, stop chan struct{}) {
ticker := time.NewTicker(time.Second * 30)
for {
select {
case <-ticker.C:
session.KeepAlive()
case <-stop:
ticker.Stop()
return
}
}
}
func createSessionHandler(w http.ResponseWriter, r *http.Request) {
session, err := gateway.Create()
if err != nil {
log.Error("gateway.Create ", err)
}
result, _ := json.Marshal(map[string]interface{}{
"session_id": session.Id,
})
w.Header().Set("Content-Type", "application/json")
w.Write(result)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment