Created
August 26, 2021 10:21
-
-
Save marvin-hansen/b19acec117455c577516c4eaa2e246dc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2021. Marvin Friedrich Lars Hansen. All Rights Reserved. Contact: marvin.hansen@gmail.com | |
package web_socket | |
import ( | |
"testing" | |
"time" | |
) | |
func TestNewWebSocketClient(t *testing.T) { | |
// Go to the link below & click connect before running the test | |
// https://www.piesocket.com/websocket-tester | |
url := "wss://demo.piesocket.com/v3/channel_1?api_key=oCdCMcMPQpbvNjUIzqtvF1d2X2okWpDQj4AwARJuAgtjhzKxVEjQU6IdCjwm¬ify_self" | |
errHandler := logError | |
msgHandler := getWSHandler(logError) | |
client, _ := NewWebSocketClient(url, msgHandler, errHandler) | |
time.Sleep(time.Second * 5) | |
client.Stop() | |
} | |
func getWSHandler(errHandler WsErrHandler) func(message []byte) { | |
wsHandler := func(message []byte) { | |
println(string(message)) | |
} | |
return wsHandler | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://webdevelop.pro/blog/guide-creating-websocket-client-golang-using-mutex-and-channel | |
// https://github.com/webdeveloppro/golang-websocket-client/blob/master/pkg/client/client.go | |
package web_socket | |
import ( | |
"context" | |
"fmt" | |
"github.com/gorilla/websocket" | |
"log" | |
"sync" | |
"time" | |
) | |
// Send pings to peer with this period | |
const pingPeriod = 30 * time.Second | |
// WebSocketClient return websocket client connection | |
type WebSocketClient struct { | |
url string | |
sendBuf chan []byte | |
ctx context.Context | |
ctxCancel context.CancelFunc | |
mu sync.RWMutex | |
wsconn *websocket.Conn | |
} | |
// NewWebSocketClient create new websocket connection | |
func NewWebSocketClient(url string, messageHandler WsHandler, errorHandler WsErrHandler) (*WebSocketClient, error) { | |
client := new(WebSocketClient) | |
client.ctx, client.ctxCancel = context.WithCancel(context.Background()) | |
client.url = url | |
client.sendBuf = make(chan []byte, 1) | |
go client.listen(messageHandler, errorHandler) | |
go client.listenWrite() | |
go client.ping() | |
return client, nil | |
} | |
func (conn *WebSocketClient) Connect() *websocket.Conn { | |
conn.mu.Lock() | |
defer conn.mu.Unlock() | |
if conn.wsconn != nil { | |
return conn.wsconn | |
} | |
ticker := time.NewTicker(time.Second) | |
defer ticker.Stop() | |
for ; ; <-ticker.C { | |
select { | |
case <-conn.ctx.Done(): | |
return nil | |
default: | |
ws, _, err := websocket.DefaultDialer.Dial(conn.url, nil) | |
if err != nil { | |
log.Println("connect", err, fmt.Sprintf("Cannot connect to websocket: %s", conn.url)) | |
continue | |
} | |
log.Println(fmt.Sprintf("connected to websocket to %s", conn.url)) | |
conn.wsconn = ws | |
return conn.wsconn | |
} | |
} | |
} | |
func (conn *WebSocketClient) listen(messageHandler WsHandler, errorHandler WsErrHandler) { | |
ticker := time.NewTicker(time.Second) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-conn.ctx.Done(): | |
return | |
case <-ticker.C: | |
for { | |
ws := conn.Connect() | |
if ws == nil { | |
return | |
} | |
_, bytMsg, err := ws.ReadMessage() | |
if err != nil { | |
errorHandler(err) | |
log.Println("listen", err, "Cannot read websocket message") | |
conn.closeWs() | |
break | |
} | |
//printRawMsg(bytMsg) | |
messageHandler(bytMsg) | |
} | |
} | |
} | |
} | |
// Write data to the websocket server | |
func (conn *WebSocketClient) Write(data []byte) error { | |
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*150) | |
defer cancel() | |
for { | |
select { | |
case conn.sendBuf <- data: // : runtime error: invalid memory address or nil pointer dereference | |
return nil | |
case <-ctx.Done(): | |
return fmt.Errorf("context canceled") | |
} | |
} | |
} | |
func (conn *WebSocketClient) listenWrite() { | |
for data := range conn.sendBuf { | |
ws := conn.Connect() | |
if ws == nil { | |
err := fmt.Errorf("conn.ws is nil") | |
logError(err) | |
log.Println("listenWrite No websocket connection") | |
continue | |
} | |
if err := ws.WriteMessage( | |
websocket.TextMessage, | |
data, | |
); err != nil { | |
log.Println("listenWrite", nil, "WebSocket Write Error") | |
} | |
} | |
} | |
// Close will send close message and shutdown websocket connection | |
func (conn *WebSocketClient) Stop() { | |
conn.ctxCancel() | |
conn.closeWs() | |
} | |
// Close will send close message and shutdown websocket connection | |
func (conn *WebSocketClient) closeWs() { | |
conn.mu.Lock() | |
if conn.wsconn != nil { | |
err := conn.wsconn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) | |
if err != nil { | |
return | |
} | |
err = conn.wsconn.Close() | |
if err != nil { | |
return | |
} | |
conn.wsconn = nil | |
} | |
conn.mu.Unlock() | |
} | |
func (conn *WebSocketClient) ping() { | |
ticker := time.NewTicker(pingPeriod) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ticker.C: | |
ws := conn.Connect() | |
if ws == nil { | |
continue | |
} | |
if err := conn.wsconn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(pingPeriod/2)); err != nil { | |
conn.closeWs() | |
} | |
case <-conn.ctx.Done(): | |
return | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment