Skip to content

Instantly share code, notes, and snippets.

@leonardo5621
Last active July 22, 2022 15:35
Show Gist options
  • Save leonardo5621/de7ed94eab9ad9ebc80708d13b5303eb to your computer and use it in GitHub Desktop.
Save leonardo5621/de7ed94eab9ad9ebc80708d13b5303eb to your computer and use it in GitHub Desktop.
func (s *sub) serve(ctx context.Context, checkFrequency int) {
clock := time.NewTicker(time.Duration(checkFrequency) * time.Second)
type fetchResult struct {
fetched Item
err error
}
fetchDone := make(chan fetchResult, 1)
var fetched Item
var fetchResponseStream chan Item
var pending bool
for {
if pending {
fetchResponseStream = s.updates
} else {
fetchResponseStream = nil
}
select {
// Clock that triggers the fetch
case <-clock.C:
// Wait for the next iteration if pending
if pending { break }
go func() {
fetched, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, err}
}()
// Case where the fetch result is
// Ready to be consumed
case result := <-fetchDone:
fetched = result.fetched
if result.err != nil {
log.Println("Fetch error: %v \n Waiting the next iteration", result.err.Error())
break
}
pending = true
// Data can be sent through the channel
case fetchResponseStream <- fetched:
pending = false
// Case where we need to close the server
case <-ctx.Done():
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment