Skip to content

Instantly share code, notes, and snippets.

@Maldris
Created November 9, 2017 23:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Maldris/4f62a8b7456fd82c3de6551ef4d8d99c to your computer and use it in GitHub Desktop.
Save Maldris/4f62a8b7456fd82c3de6551ef4d8d99c to your computer and use it in GitHub Desktop.
test websocket sever connection logic
// +build linux
package web
import (
"encoding/base64"
"net"
"net/http"
"bitbucket.org/legalautomation/logging"
"bitbucket.org/legalautomation/sessions-server/data"
"bitbucket.org/legalautomation/sessions-server/data/APIUsers"
"bitbucket.org/legalautomation/sessions-server/web/validators"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/mailru/easygo/netpoll"
)
// channel wraps websocket connection, both API and User
type channel struct {
conn net.Conn // WebSocket connection.
open bool
User bool
poller netpoll.Poller
readDesc *netpoll.Desc
readerAvailable chan struct{}
writerAvailable chan struct{}
API uint
}
func startV1APIWebsocket(w http.ResponseWriter, r *http.Request) {
logging.Info("web-sockets-api", "starting websocket, connecting from: ", r.RemoteAddr)
apiKey := r.URL.Query().Get("a")
password := r.URL.Query().Get("q")
bpswd, err := base64.StdEncoding.DecodeString(password)
if err != nil {
logging.Error("web-sockets-api", "Error decoding password base64: ", err)
http.Error(w, "Invalid Credentials", 401)
return
}
valid, reason := validators.ValidateAPIUserDetails(apiKey, string(bpswd))
if !valid {
http.Error(w, reason, 401)
return
}
usr, err := APIUsers.CheckAuth(apiKey, string(bpswd), r.RemoteAddr, data.DB)
if err != nil {
logging.Error("web-sockets-api", "Error checking API user Auth: ", err)
http.Error(w, "Invalid Credentials", 401)
return
}
conn, _, _, err := ws.UpgradeHTTP(r, w, nil)
if err != nil {
logging.Error("web-sockets-api", "Error upgrading connection: ", err)
http.Error(w, "Error starting websocket", 500)
return
}
ch := newChannel(conn, false, usr.ID)
ch.poller, err = netpoll.New(&netpoll.Config{
OnWaitError: func(err error) {
logging.Error("web-sockets-api", "Error on wait for poller: ", err)
},
})
if err != nil {
logging.Error("web-main-start", "Error creating poller: ", err)
ch.disconnect()
http.Error(w, "Error creating read listener", 500)
return
}
ch.readDesc, err = netpoll.HandleRead(conn)
if err != nil {
logging.Error("web-sockets-api", "Error creating read listener: ", err)
ch.disconnect()
http.Error(w, "Error creating read listener", 500)
return
}
ch.poller.Start(ch.readDesc, func(evt netpoll.Event) {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
// pool.Schedule(ch.Receive)
if ch.open {
go ch.receive()
}
})
}
func newChannel(conn net.Conn, user bool, api uint) *channel {
c := &channel{
conn: conn,
open: true,
User: user,
readerAvailable: make(chan struct{}, 1),
writerAvailable: make(chan struct{}, 1),
API: api,
}
c.readerAvailable <- struct{}{}
c.writerAvailable <- struct{}{}
return c
}
func (ch *channel) receive() {
_, ok := <-ch.readerAvailable
if !ok {
return
}
pkt, _, err := wsutil.ReadClientData(ch.conn)
if err != nil {
logging.Error("web-socket-read", "Error reading websocket: ", err)
ch.disconnect()
return
}
ch.readerAvailable <- struct{}{}
if ch.User {
ch.handleUserMessage(pkt)
} else {
ch.handleAPIMessage(pkt)
}
}
func (ch *channel) Send(p []byte) {
ch.write(p)
}
func (ch *channel) write(pkt []byte) {
_, ok := <-ch.writerAvailable
if !ok {
return
}
err := wsutil.WriteServerMessage(ch.conn, ws.OpBinary, pkt)
if err != nil {
logging.Error("web-socket-read", "Error writing websocket: ", err)
ch.disconnect()
return
}
ch.writerAvailable <- struct{}{}
}
func (ch *channel) disconnect() {
if ch.open {
ch.open = false
ch.poller.Stop(ch.readDesc)
ch.poller = nil
ch.readDesc.Close()
ch.readDesc = nil
close(ch.readerAvailable)
close(ch.writerAvailable)
ch.conn.Close()
logging.Info("web-socket-close", "Closed websocket for ", ch.API)
}
}
// +build windows
package web
import (
"encoding/base64"
"net"
"net/http"
"bitbucket.org/legalautomation/logging"
"bitbucket.org/legalautomation/sessions-server/data"
"bitbucket.org/legalautomation/sessions-server/data/APIUsers"
"bitbucket.org/legalautomation/sessions-server/web/validators"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
)
// channel wraps websocket connection, both API and User
type channel struct {
conn net.Conn // WebSocket connection.
open bool
User bool
writerAvailable chan struct{}
API uint
}
func startV1APIWebsocket(w http.ResponseWriter, r *http.Request) {
logging.Info("web-sockets-api", "starting websocket, connecting from: ", r.RemoteAddr)
apiKey := r.URL.Query().Get("a")
password := r.URL.Query().Get("q")
bpswd, err := base64.StdEncoding.DecodeString(password)
if err != nil {
logging.Error("web-sockets-api", "Error decoding password base64: ", err)
http.Error(w, "Invalid Credentials", 401)
return
}
valid, reason := validators.ValidateAPIUserDetails(apiKey, string(bpswd))
if !valid {
http.Error(w, reason, 401)
return
}
usr, err := APIUsers.CheckAuth(apiKey, string(bpswd), r.RemoteAddr, data.DB)
if err != nil {
logging.Error("web-sockets-api", "Error checking API user Auth: ", err)
http.Error(w, "Invalid Credentials", 401)
return
}
conn, _, _, err := ws.UpgradeHTTP(r, w, nil)
if err != nil {
logging.Error("web-sockets-api", "Error upgrading connection: ", err)
http.Error(w, "Error starting websocket", 500)
conn.Close()
return
}
ch := newChannel(conn, false, usr.ID)
go ch.receive()
}
func newChannel(conn net.Conn, user bool, api uint) *channel {
c := &channel{
conn: conn,
User: user,
API: api,
writerAvailable: make(chan struct{}, 1),
}
c.writerAvailable <- struct{}{}
return c
}
func (ch *channel) receive() {
// logging.Info("web-socket-read", "Reading")
for {
pkt, _, err := wsutil.ReadClientData(ch.conn)
if err != nil {
logging.Error("web-socket-read", "Error reading websocket: ", err)
ch.disconnect()
return
}
// logging.Info("web-socket-read", "Data: ", string(pkt))
if ch.User {
ch.handleUserMessage(pkt)
} else {
ch.handleAPIMessage(pkt)
}
}
}
func (ch *channel) Send(p []byte) {
ch.write(p)
}
func (ch *channel) write(pkt []byte) {
_, ok := <-ch.writerAvailable
if !ok {
return
}
err := wsutil.WriteServerMessage(ch.conn, ws.OpBinary, pkt)
if err != nil {
logging.Error("web-socket-read", "Error writing websocket: ", err)
ch.disconnect()
return
}
ch.writerAvailable <- struct{}{}
}
func (ch *channel) disconnect() {
if ch.open {
ch.open = false
close(ch.writerAvailable)
ch.conn.Close()
logging.Info("web-socket-close", "Closed websocket for ", ch.API)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment