Skip to content

Instantly share code, notes, and snippets.

@jkowalski
Created May 23, 2019 21:44
Show Gist options
  • Save jkowalski/0d22427e40296f14d03ebbed59891a1d to your computer and use it in GitHub Desktop.
Save jkowalski/0d22427e40296f14d03ebbed59891a1d to your computer and use it in GitHub Desktop.
batch processing algorithm
type requestWithResponseChannel struct {
request allocationRequest
ch chan allocationResponse
}
var (
pendingRequests []requestWithResponseChannel
isProcessing bool
)
func processRequest(req allocationRequest) allocationResponse {
// make a channel through which the response will be returned by another goroutine
ch := make(chan allocationResponse, 1)
// all requests add themselves to 'pendingRequests'
mutex.Lock()
pendingRequests.add(requestWithResponseChannel{req, ch})
if !isProcessing {
go processBatchedRequests(pendingRequests)
pendingRequests = nil
}
mutex.Unlock()
return <-ch
}
func processBatchedRequests(items []requestWithResponseChannel) {
for {
// TODO - process items, for each item send its response to its channel
for _, it := =range items {
it.ch <- allocationResponse{...}
}
// after processing the full batch see if there are any more requests
mutex.Lock()
if len(pendingRequests) == 0 {
// no more requests, set isProcessing to false so that the next
// request will launch another goroutine
isProcessing = false
mutex.Unlock()
return
}
// in another loop iteration new pending requests
items = pendingRequests
pendingRequests = nil
mutex.Unlock()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment