Skip to content

Instantly share code, notes, and snippets.

@mmitou
Created May 30, 2021 06:17
Show Gist options
  • Save mmitou/795bdcef2c7cd59472e58a33c45253d5 to your computer and use it in GitHub Desktop.
Save mmitou/795bdcef2c7cd59472e58a33c45253d5 to your computer and use it in GitHub Desktop.
不要なコードを削除
package main
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog/pkgerrors"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
type wsclient struct {
roomID string
id string
conn *websocket.Conn
}
func (c wsclient) sender(snd <-chan wsMessage, unregister chan<- wsclient) {
defer func() {
c.conn.Close()
unregister<- c
}()
for msg := range snd {
if err := c.conn.WriteMessage(msg.messageType, msg.payload); err != nil {
log.Debug().Err(err).Msg("c.conn.WriteMessage")
return
}
}
}
func (c wsclient) reciever(ctx context.Context, rdb *redis.Client, unregister chan<- wsclient) {
defer func() {
c.conn.Close()
unregister<- c
}()
for {
_, p, err := c.conn.ReadMessage()
if err != nil {
log.Debug().Err(err).Msg("c.conn.ReadMessage")
return
}
payload := append([]byte(c.roomID+"@"+c.id+"@"), p...)
if err := rdb.Publish(ctx, "sdp", payload).Err(); err != nil {
log.Debug().Err(err).Msg("rdb.Publish")
return
}
}
}
type wsMessage struct {
messageType int
payload []byte
}
type message struct {
roomID string
clientID string
wsMessage
err error
}
type registrar chan<- wsclient
var i = 0
func (r registrar) registerWebsocket(c echo.Context) error {
i++
w := c.Response()
req := c.Request()
roomID := c.Param("id")
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
return err
}
r <- wsclient{roomID: roomID, id: fmt.Sprintf("hello%d", i), conn: conn}
return nil
}
func parseRedisPayload(p string) (string, string, error) {
i := strings.Index(p, "@")
if i < 1 {
return "", "", errors.New("no room id")
}
roomID := p[:i]
rest := p[i+1:]
return roomID, rest, nil
}
func runPubSub(ctx context.Context) registrar {
register := make(chan wsclient)
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: "", DB: 0})
go func() {
rooms := make(map[string]map[string]chan<- wsMessage)
sub := rdb.Subscribe(ctx, "sdp")
unregister := make(chan wsclient)
for {
select {
case client := <-unregister:
if _, ok := rooms[client.roomID]; !ok {
continue
}
if _, ok := rooms[client.roomID][client.id]; !ok {
continue
}
close(rooms[client.roomID][client.id])
delete(rooms[client.roomID], client.id)
if len(rooms[client.roomID]) == 0 {
delete(rooms, client.roomID)
}
case client := <-register:
if _, ok := rooms[client.roomID]; !ok {
rooms[client.roomID] = make(map[string]chan<- wsMessage)
}
snd := make(chan wsMessage)
rooms[client.roomID][client.id] = snd
go client.sender(snd, unregister)
go client.reciever(ctx, rdb, unregister)
case rmsg := <-sub.Channel():
roomID, payload, err := parseRedisPayload(rmsg.Payload)
if err != nil {
return
}
for _, snd := range rooms[roomID] {
snd <- wsMessage{messageType: websocket.TextMessage, payload: []byte(payload)}
}
}
}
} ()
return registrar(register)
}
func main() {
zerolog.TimeFieldFormat = time.RFC3339Nano
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack
zerolog.SetGlobalLevel(zerolog.DebugLevel)
e := echo.New()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r := runPubSub(ctx)
e.HTTPErrorHandler = func(err error, c echo.Context) {
log.Debug().Err(err).Msg(fmt.Sprintf("%+v", err))
e.DefaultHTTPErrorHandler(err, c)
}
e.GET("/rooms/:id/ws", r.registerWebsocket)
e.Static("/", "./web")
e.Logger.Fatal(e.Start(":8080"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment