Last active
July 7, 2021 06:35
-
-
Save tauraamui/d553e248ba30d4a403dfaaeb97f34bde to your computer and use it in GitHub Desktop.
Dragon Daemon re-connect logic with test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
It("Should attempt to reconnect until read eventually returns ok", func() { | |
infoLogs := []string{} | |
resetLogInfo := media.OverloadLogInfo(func(format string, args ...interface{}) { | |
// fmt.Printf("INFO LOG FORMAT: %s\n", format) | |
infoLogs = append(infoLogs, fmt.Sprintf(format, args)) | |
}) | |
defer resetLogInfo() | |
processCallCount := 0 | |
ableToRead := true | |
videoCapture.readFunc = func(m *gocv.Mat) bool { | |
processCallCount++ | |
return ableToRead | |
} | |
makeVidCapReturnFalseFromRead := func() func() { ableToRead = false; return func() { ableToRead = true } } | |
makeOpenVidCapReturnError := func() func() { | |
return media.OverloadOpenVideoCapture( | |
func(string, string, int, bool, string) (media.VideoCapturable, error) { | |
processCallCount++ | |
return nil, errors.New("test unable to open video connection") | |
}, | |
) | |
} | |
ctx, cancelStreaming := context.WithCancel(context.Background()) | |
stopping := conn.Stream(ctx) | |
wg := sync.WaitGroup{} | |
wg.Add(1) | |
go func(wg *sync.WaitGroup) { | |
madeVidCapReturnFalse := false | |
madeOpenVidCapReturnError := false | |
var resetMakeVidCapReturnFalse, resetMakeOpenVidCapReturnError func() | |
defer wg.Done() | |
for processCallCount < 50 { | |
// initial 20 reads read as normal | |
// after 20 reads and less than 30 reads make read fail | |
if processCallCount > 20 && processCallCount < 30 { | |
if !madeVidCapReturnFalse { | |
resetMakeVidCapReturnFalse = makeVidCapReturnFalseFromRead() | |
madeVidCapReturnFalse = true | |
} | |
if !madeOpenVidCapReturnError { | |
resetMakeOpenVidCapReturnError = makeOpenVidCapReturnError() | |
madeOpenVidCapReturnError = true | |
} | |
} | |
if processCallCount > 30 { | |
if madeVidCapReturnFalse && resetMakeVidCapReturnFalse != nil { | |
resetMakeVidCapReturnFalse() | |
madeVidCapReturnFalse = false | |
} | |
if madeOpenVidCapReturnError && resetMakeOpenVidCapReturnError != nil { | |
resetMakeOpenVidCapReturnError() | |
madeOpenVidCapReturnError = false | |
} | |
} | |
} | |
cancelStreaming() | |
}(&wg) | |
wg.Wait() | |
Eventually(stopping).Should(BeClosed()) | |
Expect(infoLogs).To(HaveLen(11)) | |
Expect(infoLogs).To(HaveCap(16)) | |
for i := 0; i < 10; i++ { | |
Expect(infoLogs[i]).To(Equal("Attempting to reconnect to [[TestConnectionInstance]]")) | |
} | |
Expect(infoLogs[10]).To(Equal("Re-connected to [[TestConnectionInstance]]...")) | |
// Expect(infoLogs).To(ContainElements(logsToMatch)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func (c *Connection) stream(ctx context.Context) chan struct{} { | |
log.Debug("Opening root image mat") | |
img := gocv.NewMat() | |
stopping := make(chan struct{}) | |
reachedShutdownCase := false | |
go func(ctx context.Context, stopping chan struct{}) { | |
for { | |
// throttle CPU usage | |
time.Sleep(time.Millisecond * 1) | |
select { | |
case <-ctx.Done(): | |
if !reachedShutdownCase { | |
reachedShutdownCase = true | |
shutdownStreaming(c, &img, stopping) | |
} | |
case reconnect := <-c.attemptToReconnect: | |
if reconnect { | |
// if unsuccessful send reconnect message to process | |
c.attemptToReconnect <- tryReconnectStream(c) | |
} | |
default: | |
c.attemptToReconnect <- !readFromStream(c, &img) | |
} | |
} | |
}(ctx, stopping) | |
return stopping | |
} | |
func tryReconnectStream(c *Connection) bool { | |
log.Info("Attempting to reconnect to [%s]", c.title) | |
err := c.reconnect() | |
if err != nil { | |
log.Error("Unable to reconnect to [%s]... ERROR: %v", c.title, err) | |
return true | |
} | |
log.Info("Re-connected to [%s]...", c.title) | |
return false | |
} | |
func readFromStream(c *Connection, img *gocv.Mat) bool { | |
if c.vc.IsOpened() { | |
if ok := c.vc.Read(img); !ok { | |
log.Warn("Connection for stream at [%s] closed", c.title) | |
return false | |
} | |
imgClone := img.Clone() | |
select { | |
case c.buffer <- imgClone: | |
log.Debug("Sending read from to buffer...") | |
default: | |
imgClone.Close() | |
log.Debug("Buffer full...") | |
} | |
return true | |
} | |
return false | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment