Skip to content

Instantly share code, notes, and snippets.

@tauraamui
Last active July 7, 2021 06:35
Show Gist options
  • Save tauraamui/d553e248ba30d4a403dfaaeb97f34bde to your computer and use it in GitHub Desktop.
Save tauraamui/d553e248ba30d4a403dfaaeb97f34bde to your computer and use it in GitHub Desktop.
Dragon Daemon re-connect logic with test
It("Should attempt to reconnect until read eventually returns ok", func() {
infoLogs := []string{}
resetLogInfo := media.OverloadLogInfo(func(format string, args ...interface{}) {
// fmt.Printf("INFO LOG FORMAT: %s\n", format)
infoLogs = append(infoLogs, fmt.Sprintf(format, args))
})
defer resetLogInfo()
processCallCount := 0
ableToRead := true
videoCapture.readFunc = func(m *gocv.Mat) bool {
processCallCount++
return ableToRead
}
makeVidCapReturnFalseFromRead := func() func() { ableToRead = false; return func() { ableToRead = true } }
makeOpenVidCapReturnError := func() func() {
return media.OverloadOpenVideoCapture(
func(string, string, int, bool, string) (media.VideoCapturable, error) {
processCallCount++
return nil, errors.New("test unable to open video connection")
},
)
}
ctx, cancelStreaming := context.WithCancel(context.Background())
stopping := conn.Stream(ctx)
wg := sync.WaitGroup{}
wg.Add(1)
go func(wg *sync.WaitGroup) {
madeVidCapReturnFalse := false
madeOpenVidCapReturnError := false
var resetMakeVidCapReturnFalse, resetMakeOpenVidCapReturnError func()
defer wg.Done()
for processCallCount < 50 {
// initial 20 reads read as normal
// after 20 reads and less than 30 reads make read fail
if processCallCount > 20 && processCallCount < 30 {
if !madeVidCapReturnFalse {
resetMakeVidCapReturnFalse = makeVidCapReturnFalseFromRead()
madeVidCapReturnFalse = true
}
if !madeOpenVidCapReturnError {
resetMakeOpenVidCapReturnError = makeOpenVidCapReturnError()
madeOpenVidCapReturnError = true
}
}
if processCallCount > 30 {
if madeVidCapReturnFalse && resetMakeVidCapReturnFalse != nil {
resetMakeVidCapReturnFalse()
madeVidCapReturnFalse = false
}
if madeOpenVidCapReturnError && resetMakeOpenVidCapReturnError != nil {
resetMakeOpenVidCapReturnError()
madeOpenVidCapReturnError = false
}
}
}
cancelStreaming()
}(&wg)
wg.Wait()
Eventually(stopping).Should(BeClosed())
Expect(infoLogs).To(HaveLen(11))
Expect(infoLogs).To(HaveCap(16))
for i := 0; i < 10; i++ {
Expect(infoLogs[i]).To(Equal("Attempting to reconnect to [[TestConnectionInstance]]"))
}
Expect(infoLogs[10]).To(Equal("Re-connected to [[TestConnectionInstance]]..."))
// Expect(infoLogs).To(ContainElements(logsToMatch))
}
func (c *Connection) stream(ctx context.Context) chan struct{} {
log.Debug("Opening root image mat")
img := gocv.NewMat()
stopping := make(chan struct{})
reachedShutdownCase := false
go func(ctx context.Context, stopping chan struct{}) {
for {
// throttle CPU usage
time.Sleep(time.Millisecond * 1)
select {
case <-ctx.Done():
if !reachedShutdownCase {
reachedShutdownCase = true
shutdownStreaming(c, &img, stopping)
}
case reconnect := <-c.attemptToReconnect:
if reconnect {
// if unsuccessful send reconnect message to process
c.attemptToReconnect <- tryReconnectStream(c)
}
default:
c.attemptToReconnect <- !readFromStream(c, &img)
}
}
}(ctx, stopping)
return stopping
}
func tryReconnectStream(c *Connection) bool {
log.Info("Attempting to reconnect to [%s]", c.title)
err := c.reconnect()
if err != nil {
log.Error("Unable to reconnect to [%s]... ERROR: %v", c.title, err)
return true
}
log.Info("Re-connected to [%s]...", c.title)
return false
}
func readFromStream(c *Connection, img *gocv.Mat) bool {
if c.vc.IsOpened() {
if ok := c.vc.Read(img); !ok {
log.Warn("Connection for stream at [%s] closed", c.title)
return false
}
imgClone := img.Clone()
select {
case c.buffer <- imgClone:
log.Debug("Sending read from to buffer...")
default:
imgClone.Close()
log.Debug("Buffer full...")
}
return true
}
return false
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment