Skip to content

Instantly share code, notes, and snippets.

@marvin-hansen
Created August 26, 2021 10:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marvin-hansen/b19acec117455c577516c4eaa2e246dc to your computer and use it in GitHub Desktop.
Save marvin-hansen/b19acec117455c577516c4eaa2e246dc to your computer and use it in GitHub Desktop.
// Copyright (c) 2021. Marvin Friedrich Lars Hansen. All Rights Reserved. Contact: marvin.hansen@gmail.com
package web_socket
import (
"testing"
"time"
)
func TestNewWebSocketClient(t *testing.T) {
// Go to the link below & click connect before running the test
// https://www.piesocket.com/websocket-tester
url := "wss://demo.piesocket.com/v3/channel_1?api_key=oCdCMcMPQpbvNjUIzqtvF1d2X2okWpDQj4AwARJuAgtjhzKxVEjQU6IdCjwm&notify_self"
errHandler := logError
msgHandler := getWSHandler(logError)
client, _ := NewWebSocketClient(url, msgHandler, errHandler)
time.Sleep(time.Second * 5)
client.Stop()
}
func getWSHandler(errHandler WsErrHandler) func(message []byte) {
wsHandler := func(message []byte) {
println(string(message))
}
return wsHandler
}
// https://webdevelop.pro/blog/guide-creating-websocket-client-golang-using-mutex-and-channel
// https://github.com/webdeveloppro/golang-websocket-client/blob/master/pkg/client/client.go
package web_socket
import (
"context"
"fmt"
"github.com/gorilla/websocket"
"log"
"sync"
"time"
)
// Send pings to peer with this period
const pingPeriod = 30 * time.Second
// WebSocketClient return websocket client connection
type WebSocketClient struct {
url string
sendBuf chan []byte
ctx context.Context
ctxCancel context.CancelFunc
mu sync.RWMutex
wsconn *websocket.Conn
}
// NewWebSocketClient create new websocket connection
func NewWebSocketClient(url string, messageHandler WsHandler, errorHandler WsErrHandler) (*WebSocketClient, error) {
client := new(WebSocketClient)
client.ctx, client.ctxCancel = context.WithCancel(context.Background())
client.url = url
client.sendBuf = make(chan []byte, 1)
go client.listen(messageHandler, errorHandler)
go client.listenWrite()
go client.ping()
return client, nil
}
func (conn *WebSocketClient) Connect() *websocket.Conn {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.wsconn != nil {
return conn.wsconn
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for ; ; <-ticker.C {
select {
case <-conn.ctx.Done():
return nil
default:
ws, _, err := websocket.DefaultDialer.Dial(conn.url, nil)
if err != nil {
log.Println("connect", err, fmt.Sprintf("Cannot connect to websocket: %s", conn.url))
continue
}
log.Println(fmt.Sprintf("connected to websocket to %s", conn.url))
conn.wsconn = ws
return conn.wsconn
}
}
}
func (conn *WebSocketClient) listen(messageHandler WsHandler, errorHandler WsErrHandler) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-conn.ctx.Done():
return
case <-ticker.C:
for {
ws := conn.Connect()
if ws == nil {
return
}
_, bytMsg, err := ws.ReadMessage()
if err != nil {
errorHandler(err)
log.Println("listen", err, "Cannot read websocket message")
conn.closeWs()
break
}
//printRawMsg(bytMsg)
messageHandler(bytMsg)
}
}
}
}
// Write data to the websocket server
func (conn *WebSocketClient) Write(data []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*150)
defer cancel()
for {
select {
case conn.sendBuf <- data: // : runtime error: invalid memory address or nil pointer dereference
return nil
case <-ctx.Done():
return fmt.Errorf("context canceled")
}
}
}
func (conn *WebSocketClient) listenWrite() {
for data := range conn.sendBuf {
ws := conn.Connect()
if ws == nil {
err := fmt.Errorf("conn.ws is nil")
logError(err)
log.Println("listenWrite No websocket connection")
continue
}
if err := ws.WriteMessage(
websocket.TextMessage,
data,
); err != nil {
log.Println("listenWrite", nil, "WebSocket Write Error")
}
}
}
// Close will send close message and shutdown websocket connection
func (conn *WebSocketClient) Stop() {
conn.ctxCancel()
conn.closeWs()
}
// Close will send close message and shutdown websocket connection
func (conn *WebSocketClient) closeWs() {
conn.mu.Lock()
if conn.wsconn != nil {
err := conn.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
return
}
err = conn.wsconn.Close()
if err != nil {
return
}
conn.wsconn = nil
}
conn.mu.Unlock()
}
func (conn *WebSocketClient) ping() {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ws := conn.Connect()
if ws == nil {
continue
}
if err := conn.wsconn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pingPeriod/2)); err != nil {
conn.closeWs()
}
case <-conn.ctx.Done():
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment