Skip to content

Instantly share code, notes, and snippets.

@candidosales
Last active June 29, 2021 19:19

Revisions

  1. candidosales revised this gist Jun 29, 2021. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions batch_parellel_processing.go
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,5 @@
    type BatchFunc func(start, end int) error

    const (
    MaxBulkDelete = 1000
    MaxBatchSize = 25
  2. candidosales revised this gist Jun 29, 2021. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion batch_parellel_processing.go
    Original file line number Diff line number Diff line change
    @@ -45,7 +45,7 @@ func (s service) bulkDeleteInBatch(ctx context.Context, businessMetrics []*vanal

    for index, bm := range businessMetrics {

    go func(bm *vanalytics_v1.DeleteBusinessMetricRequest, id int) {
    go func(bm *vanalytics_v1.DeleteBusinessMetricRequest, id int) { // have to wrap with a go routine function
    defer itemProcessingGroup.Done() // final of the processing for each item decrement
    businessMetricsResponse := &vanalytics_v1.DeleteBusinessMetricResponse{
    Id: bm.Id,
  3. candidosales renamed this gist Aug 17, 2020. 1 changed file with 0 additions and 0 deletions.
  4. candidosales revised this gist Jul 16, 2020. 1 changed file with 6 additions and 0 deletions.
    6 changes: 6 additions & 0 deletions bath_parellel_processing.go
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,9 @@
    const (
    MaxBulkDelete = 1000
    MaxBatchSize = 25
    )


    func (s service) BulkDelete(ctx context.Context, businessMetrics []*vanalytics_v1.DeleteBusinessMetricRequest) (*vanalytics_v1.DeleteMultiBusinessMetricsResponse, error) {
    response := &vanalytics_v1.DeleteMultiBusinessMetricsResponse{}

  5. candidosales revised this gist Jul 10, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion bath_parellel_processing.go
    Original file line number Diff line number Diff line change
    @@ -64,7 +64,7 @@ func (s service) bulkDeleteInBatch(ctx context.Context, businessMetrics []*vanal
    businessMetricsResponse := <-resultsChan // receive all results from channel
    responses = append(responses, businessMetricsResponse)

    // if we've reached the expected amount of urls then stop
    // if we've reached the expected amount of items then stop
    if len(responses) == len(businessMetrics) {
    close(resultsChan) // ensure to close the channel
    break
  6. candidosales revised this gist Jul 10, 2020. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion bath_parellel_processing.go
    Original file line number Diff line number Diff line change
    @@ -31,7 +31,6 @@ func (s service) BulkDelete(ctx context.Context, businessMetrics []*vanalytics_v
    // Reference
    // - https://brunoscheufler.com/blog/2019-09-21-parallelized-batch-processing-in-go
    // - https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb#6e3e

    func (s service) bulkDeleteInBatch(ctx context.Context, businessMetrics []*vanalytics_v1.DeleteBusinessMetricRequest) ([]*vanalytics_v1.DeleteBusinessMetricResponse, error) {
    resultsChan := make(chan *vanalytics_v1.DeleteBusinessMetricResponse, len(businessMetrics)) // create a Channel

  7. candidosales revised this gist Jul 10, 2020. 1 changed file with 4 additions and 0 deletions.
    4 changes: 4 additions & 0 deletions bath_parellel_processing.go
    Original file line number Diff line number Diff line change
    @@ -28,6 +28,10 @@ func (s service) BulkDelete(ctx context.Context, businessMetrics []*vanalytics_v
    return response, nil
    }

    // Reference
    // - https://brunoscheufler.com/blog/2019-09-21-parallelized-batch-processing-in-go
    // - https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb#6e3e

    func (s service) bulkDeleteInBatch(ctx context.Context, businessMetrics []*vanalytics_v1.DeleteBusinessMetricRequest) ([]*vanalytics_v1.DeleteBusinessMetricResponse, error) {
    resultsChan := make(chan *vanalytics_v1.DeleteBusinessMetricResponse, len(businessMetrics)) // create a Channel

  8. candidosales revised this gist Jul 10, 2020. 1 changed file with 4 additions and 0 deletions.
    4 changes: 4 additions & 0 deletions bath_parellel_processing.go
    Original file line number Diff line number Diff line change
    @@ -70,6 +70,10 @@ func (s service) bulkDeleteInBatch(ctx context.Context, businessMetrics []*vanal
    return responses, nil
    }


    // Reference:
    // - https://pace.dev/blog/2020/02/13/batching-operations-in-go-by-mat-ryer.html
    // - https://github.com/pacedotdev/batch/pull/2
    func batch(count, batchSize int, eachFn BatchFunc) error {
    for i := 0; i < count; i += batchSize {
    j := i + batchSize
  9. candidosales created this gist Jul 10, 2020.
    88 changes: 88 additions & 0 deletions bath_parellel_processing.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,88 @@
    func (s service) BulkDelete(ctx context.Context, businessMetrics []*vanalytics_v1.DeleteBusinessMetricRequest) (*vanalytics_v1.DeleteMultiBusinessMetricsResponse, error) {
    response := &vanalytics_v1.DeleteMultiBusinessMetricsResponse{}

    if len(businessMetrics) > MaxBulkDelete {
    return response, verrors.New(verrors.ResourceExhausted, "the request accept only 1000 items. Your request has %d", len(businessMetrics))
    }

    err := batch(len(businessMetrics), MaxBatchSize, func(start, end int) error {
    batchItems := businessMetrics[start:end]

    if len(batchItems) > 0 {
    responses, err := s.bulkDeleteInBatch(ctx, batchItems)

    if err != nil {
    return err
    }

    response.BusinessMetrics = append(response.BusinessMetrics, responses...)
    }

    return nil
    })

    if err != nil {
    return response, err
    }

    return response, nil
    }

    func (s service) bulkDeleteInBatch(ctx context.Context, businessMetrics []*vanalytics_v1.DeleteBusinessMetricRequest) ([]*vanalytics_v1.DeleteBusinessMetricResponse, error) {
    resultsChan := make(chan *vanalytics_v1.DeleteBusinessMetricResponse, len(businessMetrics)) // create a Channel

    var itemProcessingGroup sync.WaitGroup // create a sync to wait the processing
    itemProcessingGroup.Add(len(businessMetrics)) // how many items to process

    for index, bm := range businessMetrics {

    go func(bm *vanalytics_v1.DeleteBusinessMetricRequest, id int) {
    defer itemProcessingGroup.Done() // final of the processing for each item decrement
    businessMetricsResponse := &vanalytics_v1.DeleteBusinessMetricResponse{
    Id: bm.Id,
    }

    // Process item...
    err := s.Delete(ctx, bm.Id)
    if err != nil {
    businessMetricsResponse.Error = &vanalytics_v1.Error{
    Code: int64(verrors.FromError(err).ErrorType()),
    Message: err.Error(),
    }
    }
    resultsChan <- businessMetricsResponse
    }(bm, index)
    }

    itemProcessingGroup.Wait() // wait until all the processes reach zero
    responses := []*vanalytics_v1.DeleteBusinessMetricResponse{}

    for {
    businessMetricsResponse := <-resultsChan // receive all results from channel
    responses = append(responses, businessMetricsResponse)

    // if we've reached the expected amount of urls then stop
    if len(responses) == len(businessMetrics) {
    close(resultsChan) // ensure to close the channel
    break
    }
    }
    return responses, nil
    }

    func batch(count, batchSize int, eachFn BatchFunc) error {
    for i := 0; i < count; i += batchSize {
    j := i + batchSize
    if j > count {
    j = count
    }
    err := eachFn(i, j)
    if err == errors.New("done") {
    return nil
    }
    if err != nil {
    return err
    }
    }
    return nil
    }