Skip to content

Instantly share code, notes, and snippets.

@ajm188
Created December 8, 2020 01:00
Show Gist options
  • Save ajm188/cee1f237dfe16f68241531f9ebcaec31 to your computer and use it in GitHub Desktop.
Save ajm188/cee1f237dfe16f68241531f9ebcaec31 to your computer and use it in GitHub Desktop.
ShowAllKeyspaces stream
func (s *VtctldServer) ShowAllKeyspaces(req *vtctldatapb.ShowAllKeyspacesRequest, stream vtctlservicepb.Vtctld_ShowAllKeyspacesServer) error {
ctx := stream.Context()
keyspaces, err := s.ts.GetKeyspaces(ctx)
if err != nil {
return err
}
wg := sync.WaitGroup{}
ch := make(chan string)
results := make(chan *vtctldatapb.Keyspace, len(keyspaces))
er := concurrency.AllErrorRecorder{}
wg.Add(1)
go func() {
defer wg.Done()
defer close(results) // tell the stream sender goroutine when we're done producing results
for keyspace := range ch {
ks, err := s.GetKeyspace(ctx, &vtctldatapb.GetKeyspaceRequest{Keyspace: keyspace})
if err != nil {
er.RecordError(err)
return
}
if er.HasErrors() {
return
}
results <- ks
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for ks := range results {
if err := stream.Send(ks); err != nil {
er.RecordError(err)
return
}
}
}()
done := make(chan bool)
go func() {
wg.Wait()
done <- true
}()
for _, keyspace := range keyspaces {
select {
// In the normal path, this should never block.
case ch <- keyspace:
case <-ctx.Done():
close(ch)
return ctx.Err()
case <-done:
// If we're done before we sent all the keyspaces, then someone else
// had an error.
close(ch)
return er.Error()
}
}
// We sent our work successfully, close the input channel. We never need to
// close the results channel, because that is managed by the producer
// goroutine.
close(ch)
<-done
return er.Error()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment