Last active
September 21, 2023 20:10
-
-
Save NikhilSharmaWe/635fcfeae4119cfd3977fbeb18599ccd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"flag" | |
"log" | |
"net/http" | |
"sync" | |
"time" | |
"github.com/fasthttp/websocket" | |
"github.com/labstack/echo/v4" | |
"github.com/pion/rtcp" | |
"github.com/pion/webrtc/v3" | |
) | |
var ( | |
upgrader = websocket.Upgrader{ | |
CheckOrigin: func(r *http.Request) bool { | |
return true | |
}, | |
} | |
listLock sync.RWMutex | |
peerConnections []peerConnectionState | |
// The trackLocals map in the provided code is used to keep track of all the tracks that the current client wants to forward to other clients in the video conferencing application. | |
trackLocals map[string]*webrtc.TrackLocalStaticRTP | |
) | |
type websocketMessage struct { | |
Event string `json:"event"` | |
Data string `json:"data"` | |
} | |
type peerConnectionState struct { | |
peerConnection *webrtc.PeerConnection | |
websocket *threadSafeWriter | |
} | |
type threadSafeWriter struct { | |
*websocket.Conn | |
sync.Mutex | |
} | |
func main() { | |
flag.Parse() | |
// Init other state | |
log.SetFlags(0) | |
trackLocals = map[string]*webrtc.TrackLocalStaticRTP{} | |
e := echo.New() | |
e.GET("/websocket", websocketHandler) | |
e.GET("/hello", handler) | |
e.GET("/", func(c echo.Context) error { | |
return c.File("index.html") | |
}) | |
go func() { | |
for range time.NewTicker(time.Second * 3).C { | |
dispatchKeyFrame() | |
} | |
}() | |
log.Fatal(e.Start(":8080")) | |
} | |
func handler(c echo.Context) error { | |
return c.String(http.StatusOK, "Hello") | |
} | |
func websocketHandler(c echo.Context) error { | |
unsafeConn, err := upgrader.Upgrade(c.Response(), c.Request(), nil) | |
if err != nil { | |
log.Print("upgrade:", err) | |
return err | |
} | |
conn := &threadSafeWriter{unsafeConn, sync.Mutex{}} | |
// When this frame returns close the websocket | |
defer conn.Close() | |
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{ | |
ICEServers: []webrtc.ICEServer{ | |
{ | |
URLs: []string{"stun:stun.l.google.com:19302"}, | |
}, | |
}, | |
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, | |
}) | |
if err != nil { | |
log.Println(err) | |
return err | |
} | |
// When this frame returns close the PeerConnection | |
defer peerConnection.Close() | |
// Accept one audio and one video track incoming | |
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} { | |
if _, err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{ | |
Direction: webrtc.RTPTransceiverDirectionRecvonly, | |
}); err != nil { | |
log.Println(err) | |
return err | |
} | |
} | |
listLock.Lock() | |
peerConnections = append(peerConnections, peerConnectionState{peerConnection, conn}) | |
listLock.Unlock() | |
// Trickle ICE. Emit server candidate to client | |
// this is called just after the new PeerConnection is created because in the configuration iceserver are specified even though we are not doing this here. | |
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) { | |
if i == nil { | |
return | |
} | |
candidateString, err := json.Marshal(i.ToJSON()) | |
if err != nil { | |
log.Println(err) | |
return | |
} | |
if writeErr := conn.WriteJSON(&websocketMessage{ | |
Event: "candidate", | |
Data: string(candidateString), | |
}); writeErr != nil { | |
log.Println(writeErr) | |
} | |
}) | |
peerConnection.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { | |
switch pcs { | |
case webrtc.PeerConnectionStateFailed: | |
if err := peerConnection.Close(); err != nil { | |
log.Println(err) | |
} | |
case webrtc.PeerConnectionStateClosed: | |
signalPeerConnections() | |
} | |
}) | |
peerConnection.OnTrack(func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver) { | |
// create a track to fan out our incoming video to all peers | |
trackLocal := addTrack(tr) | |
defer removeTrack(trackLocal) | |
buf := make([]byte, 1500) | |
for { | |
i, _, err := tr.Read(buf) | |
if err != nil { | |
return | |
} | |
if _, err := trackLocal.Write(buf[:i]); err != nil { | |
return | |
} | |
} | |
}) | |
signalPeerConnections() | |
message := &websocketMessage{} | |
for { | |
_, raw, err := conn.ReadMessage() | |
if err != nil { | |
log.Println(err) | |
return err | |
} else if err := json.Unmarshal(raw, &message); err != nil { | |
log.Println(err) | |
return err | |
} | |
switch message.Event { | |
case "candidate": | |
candidate := webrtc.ICECandidateInit{} | |
if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil { | |
log.Println(err) | |
return err | |
} | |
if err := peerConnection.AddICECandidate(candidate); err != nil { | |
log.Println(err) | |
return err | |
} | |
case "answer": | |
answer := webrtc.SessionDescription{} | |
if err := json.Unmarshal([]byte(message.Data), &answer); err != nil { | |
log.Println(err) | |
return err | |
} | |
if err := peerConnection.SetRemoteDescription(answer); err != nil { | |
log.Println(err) | |
return err | |
} | |
} | |
} | |
} | |
func addTrack(t *webrtc.TrackRemote) *webrtc.TrackLocalStaticRTP { | |
listLock.Lock() | |
defer func() { | |
listLock.Unlock() | |
signalPeerConnections() | |
}() | |
// here remote tracks are being converted to localtracks to be added to the server peerConnection are can be forwarding to other clients | |
trackLocal, err := webrtc.NewTrackLocalStaticRTP(t.Codec().RTPCodecCapability, t.ID(), t.StreamID()) | |
if err != nil { | |
panic(err) | |
} | |
trackLocals[trackLocal.ID()] = trackLocal | |
return trackLocal | |
} | |
func removeTrack(t *webrtc.TrackLocalStaticRTP) { | |
listLock.Lock() | |
defer func() { | |
listLock.Unlock() | |
signalPeerConnections() | |
}() | |
delete(trackLocals, t.ID()) | |
} | |
func signalPeerConnections() { | |
listLock.Lock() | |
defer func() { | |
listLock.Unlock() | |
dispatchKeyFrame() | |
}() | |
attemptSync := func() (tryAgain bool) { | |
for i := range peerConnections { | |
if peerConnections[i].peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed { | |
peerConnections = append(peerConnections[:i], peerConnections[i+1:]...) | |
return true | |
} | |
existingSenders := map[string]bool{} | |
for _, sender := range peerConnections[i].peerConnection.GetSenders() { | |
if sender.Track() == nil { | |
continue | |
} | |
existingSenders[sender.Track().ID()] = true | |
if _, ok := trackLocals[sender.Track().ID()]; !ok { | |
if err := peerConnections[i].peerConnection.RemoveTrack(sender); err != nil { | |
return true | |
} | |
} | |
} | |
// Don't receive videos we are sending, make sure we don't have loopback | |
for _, receiver := range peerConnections[i].peerConnection.GetReceivers() { | |
if receiver.Track() == nil { | |
continue | |
} | |
existingSenders[receiver.Track().ID()] = true | |
} | |
// Add all the track we are not sending yet to the PeerConnection (here we just not add tracks of the peerConnection itself to prevent loop) | |
for trackID := range trackLocals { | |
if _, ok := existingSenders[trackID]; !ok { | |
if _, err := peerConnections[i].peerConnection.AddTrack(trackLocals[trackID]); err != nil { | |
return true | |
} | |
} | |
} | |
offer, err := peerConnections[i].peerConnection.CreateOffer(nil) | |
if err != nil { | |
return true | |
} | |
if err = peerConnections[i].peerConnection.SetLocalDescription(offer); err != nil { | |
return true | |
} | |
offerString, err := json.Marshal(offer) | |
if err != nil { | |
return true | |
} | |
if err = peerConnections[i].websocket.WriteJSON(&websocketMessage{ | |
Event: "offer", | |
Data: string(offerString), | |
}); err != nil { | |
return true | |
} | |
} | |
return false | |
} | |
for syncAttempt := 0; ; syncAttempt++ { | |
if syncAttempt == 25 { | |
go func() { | |
time.Sleep(3 * time.Second) | |
signalPeerConnections() | |
}() | |
return | |
} | |
if !attemptSync() { | |
break | |
} | |
} | |
} | |
func dispatchKeyFrame() { | |
listLock.Lock() | |
defer listLock.Unlock() | |
for i := range peerConnections { | |
for _, receiver := range peerConnections[i].peerConnection.GetReceivers() { | |
if receiver.Track() == nil { | |
continue | |
} | |
_ = peerConnections[i].peerConnection.WriteRTCP([]rtcp.Packet{ | |
&rtcp.PictureLossIndication{ | |
MediaSSRC: uint32(receiver.Track().SSRC()), | |
}, | |
}) | |
} | |
} | |
} | |
func (t *threadSafeWriter) WriteJSON(v interface{}) error { | |
t.Lock() | |
defer t.Unlock() | |
return t.Conn.WriteJSON(v) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment