Skip to content

Instantly share code, notes, and snippets.

@mmitou
Created June 1, 2021 04:52
Show Gist options
  • Save mmitou/fbc95752316b3a75510d7b4e8fb9702e to your computer and use it in GitHub Desktop.
Save mmitou/fbc95752316b3a75510d7b4e8fb9702e to your computer and use it in GitHub Desktop.
websocketを使うwebrtc
<!DOCTYPE html>
<html>
<head>
<title>signaling</title>
</head>
<body>
<div class="video">
<video id="localVideo" width="320" height="240" style="border: 1px solid black;" autoplay></video>
<video id="remoteVideo" width="320" height="240" style="border: 1px solid black;" autoplay></video>
</div>
<div class="sdp">
<button id="sdpStartButton">open</button>
</div>
<script src="main.js"></script>
</body>
</html>
package main
import (
"context"
"fmt"
"strings"
"time"
"github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog/pkgerrors"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
type wsclient struct {
roomID string
id string
conn *websocket.Conn
}
func (c wsclient) sender(snd <-chan wsMessage, unregister chan<- wsclient) {
defer func() {
c.conn.Close()
unregister<- c
}()
for msg := range snd {
if err := c.conn.WriteMessage(msg.messageType, msg.payload); err != nil {
log.Debug().Err(err).Msg("c.conn.WriteMessage")
return
}
}
}
func (c wsclient) receiver(ctx context.Context, msg chan<- message, unregister chan<- wsclient) {
defer func() {
c.conn.Close()
unregister<- c
}()
for {
mt, p, err := c.conn.ReadMessage()
if err != nil {
log.Debug().Err(err).Msg("c.conn.ReadMessage")
return
}
msg <- message{roomID: c.roomID, clientID: c.id, wsMessage: wsMessage{messageType: mt, payload: p}}
}
}
type wsMessage struct {
messageType int
payload []byte
}
type message struct {
roomID string
clientID string
wsMessage
err error
}
type registrar chan<- wsclient
var i = 0
func (r registrar) registerWebsocket(c echo.Context) error {
i++
w := c.Response()
req := c.Request()
roomID := c.Param("id")
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
return err
}
r <- wsclient{roomID: roomID, id: fmt.Sprintf("hello%d", i), conn: conn}
return nil
}
func runPubSub(ctx context.Context) registrar {
register := make(chan wsclient)
go func() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: "", DB: 0})
sub := rdb.Subscribe(ctx, "sdp")
rooms := make(map[string]map[string]chan<- wsMessage)
unregister := make(chan wsclient)
msg := make(chan message)
for {
select {
case m := <-msg:
payload := append([]byte(m.roomID + "\n" + m.clientID + "\n"), m.payload ...)
if err := rdb.Publish(ctx, "sdp", payload).Err(); err != nil {
log.Debug().Err(err).Msg("rdb.Publish")
return
}
case client := <-unregister:
if _, ok := rooms[client.roomID]; !ok {
continue
}
if _, ok := rooms[client.roomID][client.id]; !ok {
continue
}
close(rooms[client.roomID][client.id])
delete(rooms[client.roomID], client.id)
if len(rooms[client.roomID]) == 0 {
delete(rooms, client.roomID)
}
if err := rdb.Decr(ctx, client.roomID).Err(); err != nil {
log.Error().Err(err).Msg("rdb.Decr")
return
}
case client := <-register:
if err := rdb.Incr(ctx, client.roomID).Err(); err != nil {
log.Error().Err(err).Msg("rdb.Incr")
return
}
if _, ok := rooms[client.roomID]; !ok {
rooms[client.roomID] = make(map[string]chan<- wsMessage)
}
snd := make(chan wsMessage)
rooms[client.roomID][client.id] = snd
go client.sender(snd, unregister)
go client.receiver(ctx, msg, unregister)
case rmsg := <-sub.Channel():
ss := strings.Split(rmsg.Payload, "\n")
if len(ss) != 4 {
log.Error().Str("payload", rmsg.Payload).Msg("parseHeader")
return
}
roomID, from, to := ss[0], ss[1], ss[2]
if to == "*" {
for id, snd := range rooms[roomID] {
if id != from {
snd <- wsMessage{messageType: websocket.TextMessage, payload: []byte(rmsg.Payload)}
}
}
continue
}
if snd, ok := rooms[roomID][to]; ok {
snd <- wsMessage{messageType: websocket.TextMessage, payload: []byte(rmsg.Payload)}
}
}
}
} ()
return registrar(register)
}
func main() {
zerolog.TimeFieldFormat = time.RFC3339Nano
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack
zerolog.SetGlobalLevel(zerolog.DebugLevel)
e := echo.New()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r := runPubSub(ctx)
e.HTTPErrorHandler = func(err error, c echo.Context) {
log.Debug().Err(err).Msg(fmt.Sprintf("%+v", err))
e.DefaultHTTPErrorHandler(err, c)
}
e.GET("/rooms/:id/ws", r.registerWebsocket)
e.Static("/", "./web")
e.Logger.Fatal(e.Start(":8080"))
}
"use strict";
document
.getElementById("sdpStartButton")
.addEventListener("click", async (event) => {
event.target.disabled = true;
try {
const ws = new WebSocket("ws://localhost:8080/rooms/hello/ws");
ws.addEventListener("open", () => console.log("open"));
ws.addEventListener("close", () => console.log("close"));
ws.addEventListener("error", (event) => {
console.log("wserror:", event);
});
const stream = await navigator.mediaDevices.getUserMedia({
audio: false,
video: true,
});
const localVideo = document.getElementById("localVideo");
localVideo.srcObject = stream;
const conn = new RTCPeerConnection({
iceServers: [
{
urls: ["stun:stun.l.google.com:19302"],
},
],
});
conn.addEventListener("connectionstatechange", console.log);
const candidates = [];
conn.addEventListener("icecandidate", (event) => {
if (event.candidate) {
console.log("icecandidate", event.candidate);
candidates.push(event.candidate);
}
});
conn.addEventListener("icecandidateerror", ({ errorCode, errorText }) => {
console.log("icecandidateerror", { errorCode, errorText });
});
conn.addEventListener("iceconnectionstatechange", (event) => {
console.log("iceconnectionstatechange", {
iceConnectionState: event.currentTarget.iceConnectionState,
iceGatheringState: event.currentTarget.iceGatheringState,
});
});
conn.addEventListener("icegatheringstatechange", (event) => {
console.log("icegatheringstatechange", {
iceConnectionState: event.currentTarget.iceConnectionState,
iceGatheringState: event.currentTarget.iceGatheringState,
});
});
conn.addEventListener("negotiationneeded", console.log);
conn.addEventListener("statsended", console.log);
conn.addEventListener("track", (event) => {
console.log("ontrack", "remote video start");
const stream = event.streams[0];
document.getElementById("remoteVideo").srcObject = stream;
});
stream.getTracks().forEach((track) => {
conn.addTrack(track, stream);
});
const offer = await conn.createOffer();
await conn.setLocalDescription(offer);
console.log("offer", offer);
ws.addEventListener("message", async (event) => {
console.log("wsmessage", event);
if (typeof event.data !== "string") {
console.log("invalid data type");
return;
}
const [roomId, from, to, payload] = event.data.split("\n");
console.log("onmessage", roomId, from, to);
const obj = JSON.parse(payload);
console.log(obj);
if (obj.type === "offer") {
await conn.setRemoteDescription(obj);
const answer = await conn.createAnswer();
await conn.setLocalDescription(answer);
console.log("recieved offer, send answer", answer);
ws.send(from + "\n" + JSON.stringify(answer));
} else if (obj.type === "answer") {
await conn.setRemoteDescription(obj);
console.log("recieved answer and send icecandidates", candidates);
ws.send(
from + "\n" + JSON.stringify({ type: "icecandidate", candidates })
);
} else if (obj.type === "icecandidate") {
console.log("recieved icecandidates", candidates);
for (const candidate of obj.candidates) {
await conn.addIceCandidate(candidate);
}
}
});
ws.send("*\n" + JSON.stringify(offer));
} catch (err) {
console.log(err);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment