Skip to content

Instantly share code, notes, and snippets.

@jvanveen
Last active September 11, 2020 15:13
Show Gist options
  • Save jvanveen/26026e78010889a26bc85c01e61b0294 to your computer and use it in GitHub Desktop.
Save jvanveen/26026e78010889a26bc85c01e61b0294 to your computer and use it in GitHub Desktop.
ion-sfu gprc/json-rpc
// Package cmd contains an entrypoint for running an ion-sfu instance.
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net"
http "net/http"
"os"
"time"
"github.com/gorilla/websocket"
sfu "github.com/pion/ion-sfu/pkg"
"github.com/pion/ion-sfu/pkg/log"
"github.com/pion/webrtc/v3"
"github.com/sourcegraph/jsonrpc2"
websocketjsonrpc2 "github.com/sourcegraph/jsonrpc2/websocket"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/spf13/viper"
pb "github.com/pion/ion-sfu/cmd/server/grpc/proto"
)
type contextKey struct {
name string
}
var peerCtxKey = &contextKey{"peer"}
type grpcConfig struct {
Port string `mapstructure:"port"`
}
// Config defines parameters for configuring the sfu instance
type Config struct {
sfu.Config `mapstructure:",squash"`
GRPC grpcConfig `mapstructure:"grpc"`
}
type peerContext struct {
peer *sfu.WebRTCTransport
}
func forContext(ctx context.Context) *peerContext {
raw, _ := ctx.Value(peerCtxKey).(*peerContext)
return raw
}
var (
conf = Config{}
file string
gRPCAddr string
jsonRPCAddr string
errNoPeer = errors.New("no peer exists")
)
type server struct {
pb.UnimplementedSFUServer
sfu *sfu.SFU
}
// Join message sent when initializing a peer connection
type Join struct {
Sid string `json:"sid"`
Offer webrtc.SessionDescription `json:"offer"`
}
// Negotiation message sent when renegotiating
type Negotiation struct {
Desc webrtc.SessionDescription `json:"desc"`
}
// Trickle message sent when renegotiating
type Trickle struct {
Candidate webrtc.ICECandidateInit `json:"candidate"`
}
func NewRPC() *server {
return &server{
sfu: sfu.NewSFU(conf.Config),
}
}
const (
portRangeLimit = 100
)
func showHelp() {
fmt.Printf("Usage:%s {params}\n", os.Args[0])
fmt.Println(" -c {config file}")
fmt.Println(" -a {grpc listen addr}")
fmt.Println(" -j {json-rpc listen addr}")
fmt.Println(" -h (show help info)")
}
func load() bool {
_, err := os.Stat(file)
if err != nil {
return false
}
viper.SetConfigFile(file)
viper.SetConfigType("toml")
err = viper.ReadInConfig()
if err != nil {
fmt.Printf("config file %s read failed. %v\n", file, err)
return false
}
err = viper.GetViper().Unmarshal(&conf)
fmt.Printf("log level: %s\n", conf.Log.Level)
if err != nil {
fmt.Printf("sfu config file %s loaded failed. %v\n", file, err)
return false
}
if len(conf.WebRTC.ICEPortRange) > 2 {
fmt.Printf("config file %s loaded failed. range port must be [min,max]\n", file)
return false
}
if len(conf.WebRTC.ICEPortRange) != 0 && conf.WebRTC.ICEPortRange[1]-conf.WebRTC.ICEPortRange[0] < portRangeLimit {
fmt.Printf("config file %s loaded failed. range port must be [min, max] and max - min >= %d\n", file, portRangeLimit)
return false
}
fmt.Printf("config file: %s\n", file)
return true
}
func parse() bool {
flag.StringVar(&file, "c", "config.toml", "config file")
flag.StringVar(&gRPCAddr, "g", ":50051", "grpc address to use")
flag.StringVar(&jsonRPCAddr, "j", ":7000", "json-rpc address to use")
help := flag.Bool("h", false, "help info")
flag.Parse()
if !load() {
return false
}
if *help {
showHelp()
return false
}
return true
}
// JSON-RPC SFU Handler
func (r *server) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
p := forContext(ctx)
switch req.Method {
case "join":
if p.peer != nil {
log.Errorf("connect: peer already exists for connection")
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", errors.New("peer already exists")),
})
break
}
var join Join
err := json.Unmarshal(*req.Params, &join)
if err != nil {
log.Errorf("connect: error parsing offer: %v", err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
peer, err := r.sfu.NewWebRTCTransport(join.Sid, join.Offer)
if err != nil {
log.Errorf("connect: error creating peer: %v", err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
log.Infof("peer %s join session %s", peer.ID(), join.Sid)
err = peer.SetRemoteDescription(join.Offer)
if err != nil {
log.Errorf("Offer error: %v", err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
answer, err := peer.CreateAnswer()
if err != nil {
log.Errorf("Offer error: answer=%v err=%v", answer, err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
err = peer.SetLocalDescription(answer)
if err != nil {
log.Errorf("Offer error: answer=%v err=%v", answer, err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
// Notify user of trickle candidates
peer.OnICECandidate(func(c *webrtc.ICECandidate) {
log.Debugf("Sending ICE candidate")
if c == nil {
// Gathering done
return
}
if err := conn.Notify(ctx, "trickle", c.ToJSON()); err != nil {
log.Errorf("error sending trickle %s", err)
}
})
peer.OnNegotiationNeeded(func() {
log.Debugf("on negotiation needed called")
offer, err := p.peer.CreateOffer()
if err != nil {
log.Errorf("CreateOffer error: %v", err)
return
}
err = p.peer.SetLocalDescription(offer)
if err != nil {
log.Errorf("SetLocalDescription error: %v", err)
return
}
if err := conn.Notify(ctx, "offer", offer); err != nil {
log.Errorf("error sending offer %s", err)
}
})
p.peer = peer
_ = conn.Reply(ctx, req.ID, answer)
// Hack until renegotation is supported in pion. Force renegotation incase there are unmatched
// receviers (i.e. sfu has more than one sender). We just naively create server offer. It is
// noop if things are already matched. We can remove once https://github.com/pion/webrtc/pull/1322
// is merged
time.Sleep(1000 * time.Millisecond)
log.Debugf("on negotiation needed called")
offer, err := p.peer.CreateOffer()
if err != nil {
log.Errorf("CreateOffer error: %v", err)
return
}
err = p.peer.SetLocalDescription(offer)
if err != nil {
log.Errorf("SetLocalDescription error: %v", err)
return
}
if err := conn.Notify(ctx, "offer", offer); err != nil {
log.Errorf("error sending offer %s", err)
}
case "offer":
if p.peer == nil {
log.Errorf("connect: no peer exists for connection")
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", errors.New("no peer exists")),
})
break
}
log.Infof("peer %s offer", p.peer.ID())
var negotiation Negotiation
err := json.Unmarshal(*req.Params, &negotiation)
if err != nil {
log.Errorf("connect: error parsing offer: %v", err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
// Peer exists, renegotiating existing peer
err = p.peer.SetRemoteDescription(negotiation.Desc)
if err != nil {
log.Errorf("Offer error: %v", err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
answer, err := p.peer.CreateAnswer()
if err != nil {
log.Errorf("Offer error: answer=%v err=%v", answer, err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
err = p.peer.SetLocalDescription(answer)
if err != nil {
log.Errorf("Offer error: answer=%v err=%v", answer, err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
_ = conn.Reply(ctx, req.ID, answer)
case "answer":
if p.peer == nil {
log.Errorf("connect: no peer exists for connection")
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", errors.New("no peer exists")),
})
break
}
log.Infof("peer %s answer", p.peer.ID())
var negotiation Negotiation
err := json.Unmarshal(*req.Params, &negotiation)
if err != nil {
log.Errorf("connect: error parsing answer: %v", err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
err = p.peer.SetRemoteDescription(negotiation.Desc)
if err != nil {
log.Errorf("error setting remote description %s", err)
}
case "trickle":
log.Debugf("trickle")
if p.peer == nil {
log.Errorf("connect: no peer exists for connection")
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", errors.New("no peer exists")),
})
break
}
log.Infof("peer %s trickle", p.peer.ID())
var trickle Trickle
err := json.Unmarshal(*req.Params, &trickle)
if err != nil {
log.Errorf("connect: error parsing candidate: %v", err)
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
Code: 500,
Message: fmt.Sprintf("%s", err),
})
break
}
err = p.peer.AddICECandidate(trickle.Candidate)
if err != nil {
log.Errorf("error setting ice candidate %s", err)
}
}
}
// Publish a stream to the sfu. Publish creates a bidirectional
// streaming rpc connection between the client and sfu.
//
// The sfu will respond with a message containing the stream pid
// and one of two different payload types:
// 1. `Connect` containing the session answer description. This
// message is *always* returned first.
// 2. `Trickle` containg candidate information for Trickle ICE.
//
// If the webrtc connection is closed, the server will close this stream.
//
// The client should send a message containg the session id
// and one of two different payload types:
// 1. `Connect` containing the session offer description. This
// message must *always* be sent first.
// 2. `Trickle` containing candidate information for Trickle ICE.
//
// If the client closes this stream, the webrtc stream will be closed.
// GRPC SFU Handler
func (s *server) Signal(stream pb.SFU_SignalServer) error {
var pid string
var peer *sfu.WebRTCTransport
for {
in, err := stream.Recv()
if err != nil {
if peer != nil {
peer.Close()
}
if err == io.EOF {
return nil
}
errStatus, _ := status.FromError(err)
if errStatus.Code() == codes.Canceled {
return nil
}
log.Errorf("signal error %v %v", errStatus.Message(), errStatus.Code())
return err
}
switch payload := in.Payload.(type) {
case *pb.SignalRequest_Join:
var answer webrtc.SessionDescription
log.Infof("signal->join called:\n%v", string(payload.Join.Offer.Sdp))
if peer != nil {
// already joined
log.Errorf("peer already exists")
return status.Errorf(codes.FailedPrecondition, "peer already exists")
}
offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(payload.Join.Offer.Sdp),
}
peer, err = s.sfu.NewWebRTCTransport(payload.Join.Sid, offer)
if err != nil {
log.Errorf("join error: %v", err)
return status.Errorf(codes.InvalidArgument, "join error %s", err)
}
log.Infof("peer %s join session %s", peer.ID(), payload.Join.Sid)
err = peer.SetRemoteDescription(offer)
if err != nil {
log.Errorf("join error: %v", err)
return status.Errorf(codes.Internal, "join error %s", err)
}
answer, err := peer.CreateAnswer()
if err != nil {
log.Errorf("join error: %v", err)
return status.Errorf(codes.Internal, "join error %s", err)
}
err = peer.SetLocalDescription(answer)
if err != nil {
log.Errorf("join error: %v", err)
return status.Errorf(codes.Internal, "join error %s", err)
}
// Notify user of trickle candidates
peer.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
// Gathering done
return
}
bytes, err := json.Marshal(c.ToJSON())
if err != nil {
log.Errorf("OnIceCandidate error %s", err)
}
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Trickle{
Trickle: &pb.Trickle{
Init: string(bytes),
},
},
})
if err != nil {
log.Errorf("OnIceCandidate error %s", err)
}
})
peer.OnNegotiationNeeded(func() {
log.Debugf("on negotiation needed called")
offer, err := peer.CreateOffer()
if err != nil {
log.Errorf("CreateOffer error: %v", err)
return
}
err = peer.SetLocalDescription(offer)
if err != nil {
log.Errorf("SetLocalDescription error: %v", err)
return
}
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Negotiate{
Negotiate: &pb.SessionDescription{
Type: offer.Type.String(),
Sdp: []byte(offer.SDP),
},
},
})
if err != nil {
log.Errorf("negotiation error %s", err)
}
})
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Join{
Join: &pb.JoinReply{
Pid: pid,
Answer: &pb.SessionDescription{
Type: answer.Type.String(),
Sdp: []byte(answer.SDP),
},
},
},
})
if err != nil {
log.Errorf("error sending join response %s", err)
return status.Errorf(codes.Internal, "join error %s", err)
}
// Hack until renegotation is supported in pion. Force renegotation incase there are unmatched
// receviers (i.e. sfu has more than one sender). We just naively create server offer. It is
// noop if things are already matched. We can remove once https://github.com/pion/webrtc/pull/1322
// is merged
time.Sleep(1000 * time.Millisecond)
offer, err = peer.CreateOffer()
if err != nil {
log.Errorf("CreateOffer error: %v", err)
return status.Errorf(codes.Internal, "join error %s", err)
}
err = peer.SetLocalDescription(offer)
if err != nil {
log.Errorf("SetLocalDescription error: %v", err)
return status.Errorf(codes.Internal, "join error %s", err)
}
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Negotiate{
Negotiate: &pb.SessionDescription{
Type: offer.Type.String(),
Sdp: []byte(offer.SDP),
},
},
})
if err != nil {
return status.Errorf(codes.Internal, "%s", err)
}
case *pb.SignalRequest_Negotiate:
if peer == nil {
return status.Errorf(codes.FailedPrecondition, "%s", errNoPeer)
}
if payload.Negotiate.Type == webrtc.SDPTypeOffer.String() {
offer := webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(payload.Negotiate.Sdp),
}
// Peer exists, renegotiating existing peer
err = peer.SetRemoteDescription(offer)
if err != nil {
return status.Errorf(codes.Internal, "%s", err)
}
answer, err := peer.CreateAnswer()
if err != nil {
return status.Errorf(codes.Internal, "%s", err)
}
err = peer.SetLocalDescription(answer)
if err != nil {
return status.Errorf(codes.Internal, "%s", err)
}
err = stream.Send(&pb.SignalReply{
Payload: &pb.SignalReply_Negotiate{
Negotiate: &pb.SessionDescription{
Type: answer.Type.String(),
Sdp: []byte(answer.SDP),
},
},
})
if err != nil {
return status.Errorf(codes.Internal, "%s", err)
}
} else if payload.Negotiate.Type == webrtc.SDPTypeAnswer.String() {
err = peer.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: string(payload.Negotiate.Sdp),
})
if err != nil {
return status.Errorf(codes.Internal, "%s", err)
}
}
case *pb.SignalRequest_Trickle:
if peer == nil {
return status.Errorf(codes.FailedPrecondition, "%s", errNoPeer)
}
var candidate webrtc.ICECandidateInit
err := json.Unmarshal([]byte(payload.Trickle.Init), &candidate)
if err != nil {
log.Errorf("error parsing ice candidate: %v", err)
}
if err := peer.AddICECandidate(candidate); err != nil {
return status.Errorf(codes.Internal, "error adding ice candidate")
}
}
}
}
func main() {
if !parse() {
showHelp()
os.Exit(-1)
}
log.Infof("--- Starting SFU Node ---")
rpc := NewRPC()
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
http.Handle("/ws", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}
defer c.Close()
p := &peerContext{}
ctx := context.WithValue(r.Context(), peerCtxKey, p)
jc := jsonrpc2.NewConn(ctx, websocketjsonrpc2.NewObjectStream(c), rpc)
<-jc.DisconnectNotify()
if p.peer != nil {
log.Infof("Closing peer")
p.peer.Close()
}
}))
var err error
log.Infof("GPRC listening at http://[%s]", gRPCAddr)
log.Infof("JSON-RPC listening at http://[%s]", jsonRPCAddr)
if err != nil {
panic(err)
}
// GRPC Listener
go func() {
lis, err := net.Listen("tcp", gRPCAddr)
s := grpc.NewServer()
if err != nil {
log.Panicf("%s", err)
}
pb.RegisterSFUServer(s, rpc)
if err := s.Serve(lis); err != nil {
log.Panicf("failed to serve: %v", err)
}
}()
go func() {
// JSON-RPC Listener
err = http.ListenAndServe(jsonRPCAddr, nil)
}()
select {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment