Last active
June 29, 2023 03:44
-
-
Save gloriousCode/bf267388915a9cdc2f7899bd97b8c3a3 to your computer and use it in GitHub Desktop.
This ensures that the multiplexor is shutdown, that any listeners exit and extra locks are protecting the connchecker. The funtimer is used to simulate a disconnection error every 10 seconds its connected to ensure things are cleaned up
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/connchecker/connchecker.go b/connchecker/connchecker.go | |
index 59705ce6f..072837719 100644 | |
--- a/connchecker/connchecker.go | |
+++ b/connchecker/connchecker.go | |
@@ -51,7 +51,7 @@ func New(dnsList, domainList []string, checkInterval time.Duration) (*Checker, e | |
if err := c.initialCheck(); err != nil { | |
return nil, err | |
} | |
- | |
+ c.mu.Lock() | |
if c.connected { | |
log.Debugln(log.Global, ConnFound) | |
} else { | |
@@ -59,6 +59,7 @@ func New(dnsList, domainList []string, checkInterval time.Duration) (*Checker, e | |
} | |
c.shutdown = make(chan struct{}, 1) | |
+ c.mu.Unlock() | |
var wg sync.WaitGroup | |
wg.Add(1) | |
go c.Monitor(&wg) | |
@@ -79,7 +80,9 @@ type Checker struct { | |
// Shutdown cleanly shutsdown monitor routine | |
func (c *Checker) Shutdown() { | |
+ c.mu.Lock() | |
c.connected = false | |
+ c.mu.Unlock() | |
close(c.shutdown) | |
c.wg.Wait() | |
} | |
@@ -126,7 +129,9 @@ func (c *Checker) initialCheck() error { | |
connected = true | |
} | |
} | |
+ c.mu.Lock() | |
c.connected = connected | |
+ c.mu.Unlock() | |
return nil | |
} | |
diff --git a/exchanges/okx/okx_websocket.go b/exchanges/okx/okx_websocket.go | |
index 24c04406d..a9ac554ce 100644 | |
--- a/exchanges/okx/okx_websocket.go | |
+++ b/exchanges/okx/okx_websocket.go | |
@@ -229,10 +229,10 @@ func (ok *Okx) WsConnect() error { | |
if err != nil { | |
return err | |
} | |
- ok.Websocket.Wg.Add(2) | |
+ ok.Websocket.Wg.Add(3) | |
go ok.wsFunnelConnectionData(ok.Websocket.Conn) | |
go ok.WsReadData() | |
- go ok.WsResponseMultiplexer.Run() | |
+ go ok.WsResponseMultiplexer.Run(ok) | |
if ok.Verbose { | |
log.Debugf(log.ExchangeSys, "Successful connection to %v\n", | |
ok.Websocket.GetWebsocketURL()) | |
@@ -242,6 +242,7 @@ func (ok *Okx) WsConnect() error { | |
Message: pingMsg, | |
Delay: time.Second * 27, | |
}) | |
+ | |
if ok.IsWebsocketAuthenticationSupported() { | |
var authDialer websocket.Dialer | |
authDialer.ReadBufferSize = 8192 | |
@@ -1691,10 +1692,13 @@ func (ok *Okx) WsAmendMultipleOrders(args []AmendOrderRequestParams) ([]OrderDat | |
} | |
// Run this functions distributes websocket request responses to | |
-func (m *wsRequestDataChannelsMultiplexer) Run() { | |
+func (m *wsRequestDataChannelsMultiplexer) Run(ok *Okx) { | |
+ defer ok.Websocket.Wg.Done() | |
tickerData := time.NewTicker(time.Second) | |
for { | |
select { | |
+ case <-ok.Websocket.ShutdownC: | |
+ return | |
case <-tickerData.C: | |
for x, myChan := range m.WsResponseChannelsMap { | |
if myChan == nil { | |
diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go | |
index 1b674a4bc..cd3c33c8f 100644 | |
--- a/exchanges/stream/websocket.go | |
+++ b/exchanges/stream/websocket.go | |
@@ -392,10 +392,10 @@ func (w *Websocket) connectionMonitor() error { | |
case err := <-w.ReadMessageErrors: | |
if isDisconnectionError(err) { | |
w.setInit(false) | |
+ w.shutdownCleanup() | |
log.Warnf(log.WebsocketMgr, | |
"%v websocket has been disconnected. Reason: %v", | |
w.exchangeName, err) | |
- w.setConnectedStatus(false) | |
} else { | |
// pass off non disconnect errors to datahandler to manage | |
w.DataHandler <- err | |
@@ -423,8 +423,6 @@ func (w *Websocket) connectionMonitor() error { | |
// Shutdown attempts to shut down a websocket connection and associated routines | |
// by using a package defined shutdown function | |
func (w *Websocket) Shutdown() error { | |
- w.m.Lock() | |
- defer w.m.Unlock() | |
if !w.IsConnected() { | |
return fmt.Errorf("%v websocket: cannot shutdown %w", | |
@@ -443,9 +441,6 @@ func (w *Websocket) Shutdown() error { | |
"%v websocket: shutting down websocket\n", | |
w.exchangeName) | |
} | |
- | |
- defer w.Orderbook.FlushBuffer() | |
- | |
if w.Conn != nil { | |
if err := w.Conn.Shutdown(); err != nil { | |
return err | |
@@ -457,6 +452,14 @@ func (w *Websocket) Shutdown() error { | |
return err | |
} | |
} | |
+ w.shutdownCleanup() | |
+ return nil | |
+} | |
+ | |
+func (w *Websocket) shutdownCleanup() { | |
+ w.m.Lock() | |
+ defer w.m.Unlock() | |
+ defer w.Orderbook.FlushBuffer() | |
// flush any subscriptions from last connection if needed | |
w.subscriptionMutex.Lock() | |
@@ -473,7 +476,6 @@ func (w *Websocket) Shutdown() error { | |
"%v websocket: completed websocket shutdown\n", | |
w.exchangeName) | |
} | |
- return nil | |
} | |
// FlushChannels flushes channel subscriptions when there is a pair/asset change |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment