Skip to content

Instantly share code, notes, and snippets.

@zeebo
Created May 3, 2021 14:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zeebo/ae635824205088a51031c09cb07ea553 to your computer and use it in GitHub Desktop.
Save zeebo/ae635824205088a51031c09cb07ea553 to your computer and use it in GitHub Desktop.
websocket demo code
package drpcws
import (
"context"
"net/http"
"sync"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/zeebo/errs"
"storj.io/drpc"
"storj.io/drpc/drpchttp"
)
func NewGobwasConn(dialer ws.Dialer, url string) drpc.Conn {
return &gobwasConn{dialer: dialer, url: url}
}
// TODO(jeff): Invoke and NewStream dial a new websocket each time. there are many
// ways we might want to do something different, but this is fine for now. the
// path this code takes does have the unfortunate side effect of making the
// Close, Closed and Transport methods no-ops.
type gobwasConn struct {
dialer ws.Dialer
url string
}
func (c *gobwasConn) Close() error { return nil }
func (c *gobwasConn) Closed() bool { return false }
func (c *gobwasConn) Transport() drpc.Transport { return nil }
func (c *gobwasConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in drpc.Message, out drpc.Message) error {
stream, err := c.NewStream(ctx, rpc, enc)
if err != nil {
return err
}
defer func() { _ = stream.Close() }()
if err := stream.MsgSend(in, enc); err != nil {
return err
}
if err := stream.MsgRecv(out, enc); err != nil {
return err
}
return nil
}
// NewStream starts a stream with the remote. Only one Invoke or Stream may be
// open at once.
func (c *gobwasConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
// TODO(jeff): support sending application/json as the content-type
// TODO: parse + add rpc correctly
conn, _, _, err := c.dialer.Dial(ctx, c.url+rpc)
if err != nil {
return nil, err
}
return newGobwasStream(ctx, conn, false, true), nil
}
func NewGobwas(handler drpc.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
ctx, err := drpchttp.Context(req)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
conn, _, _, err := ws.UpgradeHTTP(req, rw)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
defer func() { _ = conn.Close() }()
stream := newGobwasStream(ctx, conn, req.Header.Get("Content-Type") == "application/json", false)
defer func() { _ = stream.Close() }()
if err := handler.HandleRPC(stream, req.URL.Path); err != nil {
// TODO(jeff): how to signal this error back after we've hijacked?
// json envelope? what about if protobuf?
_ = err
}
})
}
// TODO(jeff): gobwasStream probably needs to see what kinds of errors happen in
// half closed situations, and return the appropriate errors back to callers
type gobwasStream struct {
mu sync.Mutex
ctx context.Context
tr drpc.Transport
json bool
client bool
}
func newGobwasStream(ctx context.Context, tr drpc.Transport, json, client bool) *gobwasStream {
return &gobwasStream{
ctx: ctx,
tr: tr,
json: json,
client: client,
}
}
func (w *gobwasStream) Context() context.Context { return w.ctx }
func (w *gobwasStream) MsgSend(msg drpc.Message, enc drpc.Encoding) (err error) {
w.mu.Lock()
defer w.mu.Unlock()
var data []byte
if w.json {
data, err = drpchttp.JSONMarshal(msg, enc)
} else {
data, err = enc.Marshal(msg)
}
if err != nil {
return err
}
if w.client {
return wsutil.WriteClientBinary(w.tr, data)
} else {
return wsutil.WriteServerBinary(w.tr, data)
}
}
func (w *gobwasStream) MsgRecv(msg drpc.Message, enc drpc.Encoding) (err error) {
w.mu.Lock()
defer w.mu.Unlock()
var data []byte
if w.client {
data, err = wsutil.ReadServerBinary(w.tr)
} else {
data, err = wsutil.ReadClientBinary(w.tr)
}
if err != nil {
return err
}
if w.json {
return drpchttp.JSONUnmarshal(data, msg, enc)
} else {
return enc.Unmarshal(data, msg)
}
}
func (w *gobwasStream) CloseSend() error {
w.mu.Lock()
defer w.mu.Unlock()
return ws.WriteFrame(w.tr, ws.NewCloseFrame(nil))
}
func (w *gobwasStream) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
return errs.Combine(ws.WriteFrame(w.tr, ws.NewCloseFrame(nil)), w.tr.Close())
}
package drpcws
import (
"context"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"storj.io/drpc"
"storj.io/drpc/drpchttp"
)
func NewGorilla(handler drpc.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
ctx, err := drpchttp.Context(req)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn, err := upgrader.Upgrade(rw, req, nil)
if err != nil {
return
}
defer func() { _ = conn.Close() }()
stream := newGorillaStream(ctx, conn, req.Header.Get("Content-Type") == "application/json")
defer func() { _ = stream.Close() }()
if err := handler.HandleRPC(stream, req.URL.Path); err != nil {
// TODO(jeff): how to signal this error back after we've hijacked?
// json envelope? what about if protobuf?
_ = err
}
})
}
// TODO(jeff): gorillaStream probably needs to see what kinds of errors happen in
// half closed situations, and return the appropriate errors back to callers
type gorillaStream struct {
mu sync.Mutex
ctx context.Context
conn *websocket.Conn
json bool
}
func newGorillaStream(ctx context.Context, conn *websocket.Conn, json bool) *gorillaStream {
return &gorillaStream{
ctx: ctx,
conn: conn,
json: json,
}
}
func (w *gorillaStream) Context() context.Context { return w.ctx }
func (w *gorillaStream) MsgSend(msg drpc.Message, enc drpc.Encoding) (err error) {
w.mu.Lock()
defer w.mu.Unlock()
var data []byte
if w.json {
data, err = drpchttp.JSONMarshal(msg, enc)
} else {
data, err = enc.Marshal(msg)
}
if err != nil {
return err
}
return w.conn.WriteMessage(websocket.BinaryMessage, data)
}
func (w *gorillaStream) MsgRecv(msg drpc.Message, enc drpc.Encoding) (err error) {
w.mu.Lock()
defer w.mu.Unlock()
_, data, err := w.conn.ReadMessage()
if err != nil {
return err
}
if w.json {
return drpchttp.JSONUnmarshal(data, msg, enc)
} else {
return enc.Unmarshal(data, msg)
}
}
func (w *gorillaStream) CloseSend() error {
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Minute))
}
func (w *gorillaStream) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
return w.conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Minute))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment