Skip to content

Instantly share code, notes, and snippets.

@candidosales
Last active June 29, 2021 19:19
Show Gist options
  • Save candidosales/f75a4ffa4df45f7119591a2f5171d234 to your computer and use it in GitHub Desktop.
Save candidosales/f75a4ffa4df45f7119591a2f5171d234 to your computer and use it in GitHub Desktop.
Use batch and coroutines to parallelize processing
type BatchFunc func(start, end int) error
const (
MaxBulkDelete = 1000
MaxBatchSize = 25
)
func (s service) BulkDelete(ctx context.Context, businessMetrics []*vanalytics_v1.DeleteBusinessMetricRequest) (*vanalytics_v1.DeleteMultiBusinessMetricsResponse, error) {
response := &vanalytics_v1.DeleteMultiBusinessMetricsResponse{}
if len(businessMetrics) > MaxBulkDelete {
return response, verrors.New(verrors.ResourceExhausted, "the request accept only 1000 items. Your request has %d", len(businessMetrics))
}
err := batch(len(businessMetrics), MaxBatchSize, func(start, end int) error {
batchItems := businessMetrics[start:end]
if len(batchItems) > 0 {
responses, err := s.bulkDeleteInBatch(ctx, batchItems)
if err != nil {
return err
}
response.BusinessMetrics = append(response.BusinessMetrics, responses...)
}
return nil
})
if err != nil {
return response, err
}
return response, nil
}
// Reference
// - https://brunoscheufler.com/blog/2019-09-21-parallelized-batch-processing-in-go
// - https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb#6e3e
func (s service) bulkDeleteInBatch(ctx context.Context, businessMetrics []*vanalytics_v1.DeleteBusinessMetricRequest) ([]*vanalytics_v1.DeleteBusinessMetricResponse, error) {
resultsChan := make(chan *vanalytics_v1.DeleteBusinessMetricResponse, len(businessMetrics)) // create a Channel
var itemProcessingGroup sync.WaitGroup // create a sync to wait the processing
itemProcessingGroup.Add(len(businessMetrics)) // how many items to process
for index, bm := range businessMetrics {
go func(bm *vanalytics_v1.DeleteBusinessMetricRequest, id int) { // have to wrap with a go routine function
defer itemProcessingGroup.Done() // final of the processing for each item decrement
businessMetricsResponse := &vanalytics_v1.DeleteBusinessMetricResponse{
Id: bm.Id,
}
// Process item...
err := s.Delete(ctx, bm.Id)
if err != nil {
businessMetricsResponse.Error = &vanalytics_v1.Error{
Code: int64(verrors.FromError(err).ErrorType()),
Message: err.Error(),
}
}
resultsChan <- businessMetricsResponse
}(bm, index)
}
itemProcessingGroup.Wait() // wait until all the processes reach zero
responses := []*vanalytics_v1.DeleteBusinessMetricResponse{}
for {
businessMetricsResponse := <-resultsChan // receive all results from channel
responses = append(responses, businessMetricsResponse)
// if we've reached the expected amount of items then stop
if len(responses) == len(businessMetrics) {
close(resultsChan) // ensure to close the channel
break
}
}
return responses, nil
}
// Reference:
// - https://pace.dev/blog/2020/02/13/batching-operations-in-go-by-mat-ryer.html
// - https://github.com/pacedotdev/batch/pull/2
func batch(count, batchSize int, eachFn BatchFunc) error {
for i := 0; i < count; i += batchSize {
j := i + batchSize
if j > count {
j = count
}
err := eachFn(i, j)
if err == errors.New("done") {
return nil
}
if err != nil {
return err
}
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment