Skip to content

Instantly share code, notes, and snippets.

@gloriousCode
Last active June 29, 2023 03:44
Show Gist options
  • Save gloriousCode/bf267388915a9cdc2f7899bd97b8c3a3 to your computer and use it in GitHub Desktop.
Save gloriousCode/bf267388915a9cdc2f7899bd97b8c3a3 to your computer and use it in GitHub Desktop.
This ensures that the multiplexor is shutdown, that any listeners exit and extra locks are protecting the connchecker. The funtimer is used to simulate a disconnection error every 10 seconds its connected to ensure things are cleaned up
diff --git a/connchecker/connchecker.go b/connchecker/connchecker.go
index 59705ce6f..072837719 100644
--- a/connchecker/connchecker.go
+++ b/connchecker/connchecker.go
@@ -51,7 +51,7 @@ func New(dnsList, domainList []string, checkInterval time.Duration) (*Checker, e
if err := c.initialCheck(); err != nil {
return nil, err
}
-
+ c.mu.Lock()
if c.connected {
log.Debugln(log.Global, ConnFound)
} else {
@@ -59,6 +59,7 @@ func New(dnsList, domainList []string, checkInterval time.Duration) (*Checker, e
}
c.shutdown = make(chan struct{}, 1)
+ c.mu.Unlock()
var wg sync.WaitGroup
wg.Add(1)
go c.Monitor(&wg)
@@ -79,7 +80,9 @@ type Checker struct {
// Shutdown cleanly shutsdown monitor routine
func (c *Checker) Shutdown() {
+ c.mu.Lock()
c.connected = false
+ c.mu.Unlock()
close(c.shutdown)
c.wg.Wait()
}
@@ -126,7 +129,9 @@ func (c *Checker) initialCheck() error {
connected = true
}
}
+ c.mu.Lock()
c.connected = connected
+ c.mu.Unlock()
return nil
}
diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go
index 24c04406d..a9ac554ce 100644
--- a/exchanges/okx/okx_websocket.go
+++ b/exchanges/okx/okx_websocket.go
@@ -229,10 +229,10 @@ func (ok *Okx) WsConnect() error {
if err != nil {
return err
}
- ok.Websocket.Wg.Add(2)
+ ok.Websocket.Wg.Add(3)
go ok.wsFunnelConnectionData(ok.Websocket.Conn)
go ok.WsReadData()
- go ok.WsResponseMultiplexer.Run()
+ go ok.WsResponseMultiplexer.Run(ok)
if ok.Verbose {
log.Debugf(log.ExchangeSys, "Successful connection to %v\n",
ok.Websocket.GetWebsocketURL())
@@ -242,6 +242,7 @@ func (ok *Okx) WsConnect() error {
Message: pingMsg,
Delay: time.Second * 27,
})
+
if ok.IsWebsocketAuthenticationSupported() {
var authDialer websocket.Dialer
authDialer.ReadBufferSize = 8192
@@ -1691,10 +1692,13 @@ func (ok *Okx) WsAmendMultipleOrders(args []AmendOrderRequestParams) ([]OrderDat
}
// Run this functions distributes websocket request responses to
-func (m *wsRequestDataChannelsMultiplexer) Run() {
+func (m *wsRequestDataChannelsMultiplexer) Run(ok *Okx) {
+ defer ok.Websocket.Wg.Done()
tickerData := time.NewTicker(time.Second)
for {
select {
+ case <-ok.Websocket.ShutdownC:
+ return
case <-tickerData.C:
for x, myChan := range m.WsResponseChannelsMap {
if myChan == nil {
diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go
index 1b674a4bc..cd3c33c8f 100644
--- a/exchanges/stream/websocket.go
+++ b/exchanges/stream/websocket.go
@@ -392,10 +392,10 @@ func (w *Websocket) connectionMonitor() error {
case err := <-w.ReadMessageErrors:
if isDisconnectionError(err) {
w.setInit(false)
+ w.shutdownCleanup()
log.Warnf(log.WebsocketMgr,
"%v websocket has been disconnected. Reason: %v",
w.exchangeName, err)
- w.setConnectedStatus(false)
} else {
// pass off non disconnect errors to datahandler to manage
w.DataHandler <- err
@@ -423,8 +423,6 @@ func (w *Websocket) connectionMonitor() error {
// Shutdown attempts to shut down a websocket connection and associated routines
// by using a package defined shutdown function
func (w *Websocket) Shutdown() error {
- w.m.Lock()
- defer w.m.Unlock()
if !w.IsConnected() {
return fmt.Errorf("%v websocket: cannot shutdown %w",
@@ -443,9 +441,6 @@ func (w *Websocket) Shutdown() error {
"%v websocket: shutting down websocket\n",
w.exchangeName)
}
-
- defer w.Orderbook.FlushBuffer()
-
if w.Conn != nil {
if err := w.Conn.Shutdown(); err != nil {
return err
@@ -457,6 +452,14 @@ func (w *Websocket) Shutdown() error {
return err
}
}
+ w.shutdownCleanup()
+ return nil
+}
+
+func (w *Websocket) shutdownCleanup() {
+ w.m.Lock()
+ defer w.m.Unlock()
+ defer w.Orderbook.FlushBuffer()
// flush any subscriptions from last connection if needed
w.subscriptionMutex.Lock()
@@ -473,7 +476,6 @@ func (w *Websocket) Shutdown() error {
"%v websocket: completed websocket shutdown\n",
w.exchangeName)
}
- return nil
}
// FlushChannels flushes channel subscriptions when there is a pair/asset change
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment