Skip to content

Instantly share code, notes, and snippets.

@NikhilSharmaWe
Last active September 21, 2023 20:10
Show Gist options
  • Save NikhilSharmaWe/635fcfeae4119cfd3977fbeb18599ccd to your computer and use it in GitHub Desktop.
Save NikhilSharmaWe/635fcfeae4119cfd3977fbeb18599ccd to your computer and use it in GitHub Desktop.
package main
import (
"encoding/json"
"flag"
"log"
"net/http"
"sync"
"time"
"github.com/fasthttp/websocket"
"github.com/labstack/echo/v4"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
)
var (
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
listLock sync.RWMutex
peerConnections []peerConnectionState
// The trackLocals map in the provided code is used to keep track of all the tracks that the current client wants to forward to other clients in the video conferencing application.
trackLocals map[string]*webrtc.TrackLocalStaticRTP
)
type websocketMessage struct {
Event string `json:"event"`
Data string `json:"data"`
}
type peerConnectionState struct {
peerConnection *webrtc.PeerConnection
websocket *threadSafeWriter
}
type threadSafeWriter struct {
*websocket.Conn
sync.Mutex
}
func main() {
flag.Parse()
// Init other state
log.SetFlags(0)
trackLocals = map[string]*webrtc.TrackLocalStaticRTP{}
e := echo.New()
e.GET("/websocket", websocketHandler)
e.GET("/hello", handler)
e.GET("/", func(c echo.Context) error {
return c.File("index.html")
})
go func() {
for range time.NewTicker(time.Second * 3).C {
dispatchKeyFrame()
}
}()
log.Fatal(e.Start(":8080"))
}
func handler(c echo.Context) error {
return c.String(http.StatusOK, "Hello")
}
func websocketHandler(c echo.Context) error {
unsafeConn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
log.Print("upgrade:", err)
return err
}
conn := &threadSafeWriter{unsafeConn, sync.Mutex{}}
// When this frame returns close the websocket
defer conn.Close()
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
})
if err != nil {
log.Println(err)
return err
}
// When this frame returns close the PeerConnection
defer peerConnection.Close()
// Accept one audio and one video track incoming
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
if _, err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
}); err != nil {
log.Println(err)
return err
}
}
listLock.Lock()
peerConnections = append(peerConnections, peerConnectionState{peerConnection, conn})
listLock.Unlock()
// Trickle ICE. Emit server candidate to client
// this is called just after the new PeerConnection is created because in the configuration iceserver are specified even though we are not doing this here.
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
return
}
candidateString, err := json.Marshal(i.ToJSON())
if err != nil {
log.Println(err)
return
}
if writeErr := conn.WriteJSON(&websocketMessage{
Event: "candidate",
Data: string(candidateString),
}); writeErr != nil {
log.Println(writeErr)
}
})
peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
switch pcs {
case webrtc.PeerConnectionStateFailed:
if err := peerConnection.Close(); err != nil {
log.Println(err)
}
case webrtc.PeerConnectionStateClosed:
signalPeerConnections()
}
})
peerConnection.OnTrack(func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver) {
// create a track to fan out our incoming video to all peers
trackLocal := addTrack(tr)
defer removeTrack(trackLocal)
buf := make([]byte, 1500)
for {
i, _, err := tr.Read(buf)
if err != nil {
return
}
if _, err := trackLocal.Write(buf[:i]); err != nil {
return
}
}
})
signalPeerConnections()
message := &websocketMessage{}
for {
_, raw, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return err
} else if err := json.Unmarshal(raw, &message); err != nil {
log.Println(err)
return err
}
switch message.Event {
case "candidate":
candidate := webrtc.ICECandidateInit{}
if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
log.Println(err)
return err
}
if err := peerConnection.AddICECandidate(candidate); err != nil {
log.Println(err)
return err
}
case "answer":
answer := webrtc.SessionDescription{}
if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
log.Println(err)
return err
}
if err := peerConnection.SetRemoteDescription(answer); err != nil {
log.Println(err)
return err
}
}
}
}
func addTrack(t *webrtc.TrackRemote) *webrtc.TrackLocalStaticRTP {
listLock.Lock()
defer func() {
listLock.Unlock()
signalPeerConnections()
}()
// here remote tracks are being converted to localtracks to be added to the server peerConnection are can be forwarding to other clients
trackLocal, err := webrtc.NewTrackLocalStaticRTP(t.Codec().RTPCodecCapability, t.ID(), t.StreamID())
if err != nil {
panic(err)
}
trackLocals[trackLocal.ID()] = trackLocal
return trackLocal
}
func removeTrack(t *webrtc.TrackLocalStaticRTP) {
listLock.Lock()
defer func() {
listLock.Unlock()
signalPeerConnections()
}()
delete(trackLocals, t.ID())
}
func signalPeerConnections() {
listLock.Lock()
defer func() {
listLock.Unlock()
dispatchKeyFrame()
}()
attemptSync := func() (tryAgain bool) {
for i := range peerConnections {
if peerConnections[i].peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
peerConnections = append(peerConnections[:i], peerConnections[i+1:]...)
return true
}
existingSenders := map[string]bool{}
for _, sender := range peerConnections[i].peerConnection.GetSenders() {
if sender.Track() == nil {
continue
}
existingSenders[sender.Track().ID()] = true
if _, ok := trackLocals[sender.Track().ID()]; !ok {
if err := peerConnections[i].peerConnection.RemoveTrack(sender); err != nil {
return true
}
}
}
// Don't receive videos we are sending, make sure we don't have loopback
for _, receiver := range peerConnections[i].peerConnection.GetReceivers() {
if receiver.Track() == nil {
continue
}
existingSenders[receiver.Track().ID()] = true
}
// Add all the track we are not sending yet to the PeerConnection (here we just not add tracks of the peerConnection itself to prevent loop)
for trackID := range trackLocals {
if _, ok := existingSenders[trackID]; !ok {
if _, err := peerConnections[i].peerConnection.AddTrack(trackLocals[trackID]); err != nil {
return true
}
}
}
offer, err := peerConnections[i].peerConnection.CreateOffer(nil)
if err != nil {
return true
}
if err = peerConnections[i].peerConnection.SetLocalDescription(offer); err != nil {
return true
}
offerString, err := json.Marshal(offer)
if err != nil {
return true
}
if err = peerConnections[i].websocket.WriteJSON(&websocketMessage{
Event: "offer",
Data: string(offerString),
}); err != nil {
return true
}
}
return false
}
for syncAttempt := 0; ; syncAttempt++ {
if syncAttempt == 25 {
go func() {
time.Sleep(3 * time.Second)
signalPeerConnections()
}()
return
}
if !attemptSync() {
break
}
}
}
func dispatchKeyFrame() {
listLock.Lock()
defer listLock.Unlock()
for i := range peerConnections {
for _, receiver := range peerConnections[i].peerConnection.GetReceivers() {
if receiver.Track() == nil {
continue
}
_ = peerConnections[i].peerConnection.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{
MediaSSRC: uint32(receiver.Track().SSRC()),
},
})
}
}
}
func (t *threadSafeWriter) WriteJSON(v interface{}) error {
t.Lock()
defer t.Unlock()
return t.Conn.WriteJSON(v)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment