Skip to content

Instantly share code, notes, and snippets.

@bluebrown
Created December 21, 2021 09:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bluebrown/f0401fd7c1941985101a35085c041502 to your computer and use it in GitHub Desktop.
Save bluebrown/f0401fd7c1941985101a35085c041502 to your computer and use it in GitHub Desktop.
Go HTTP Batch Request Handler
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"net/http/httptest"
"net/textproto"
"sync"
)
/*
the batch handler implements a multipart/mixed request handler
like found here https://developers.google.com/gmail/api/guides/batch
but each subrequest must include the HTTP/1.1 protocol header
the Server struct contains at least the router to handle the requests
for example *mux.Router
the router should have the routes with the subrequests paths registered
Example Request:
POST /batch HTTP/1.1
Host: localhost:8080
Content-Type: multipart/mixed; boundary=boundary
--boundary
Content-Type: application/http
Content-ID: <item1>
POST /items HTTP/1.1
Content-Type: application/json
Content-length: 58
Accept: application/json
{ "name": "batch1", "description": "batch1 description" }
--boundary
Content-Type: application/http
Content-ID: <item2>
POST /items HTTP/1.1
Content-Type: application/json
Content-length: 58
Accept: application/json
{ "name": "broken", "descr
--boundary--
*/
func (s *Server) HandleBatch() http.HandlerFunc {
// used to collect the operations to do
type BatchOperation struct {
ContentID string
Request *http.Request
}
// used to collect the responses
type BatchResult struct {
ContentID string
Response *http.Response
}
// the actual handler function
return func(w http.ResponseWriter, r *http.Request) {
// read the multipart body
reader, err := r.MultipartReader()
if err != nil {
http.Error(w, fmt.Sprintf("could create a multipart reader %v\n", err), http.StatusInternalServerError)
}
// prepare a multipart response
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
ops := []BatchOperation{}
counter := 0
// read each part save the for later to process
// them concurrently once all parts are read
// and the general request was valid
for {
// count the parts and return error if its more than 100
counter++
if counter > 100 {
http.Error(w, "too many parts: max 100 parts are allowed", http.StatusBadRequest)
return
}
requestPart, err := reader.NextPart()
// if we've reached the end, break out of the loop
if err == io.EOF {
break
}
// if part is not readable, return with error
if err != nil {
http.Error(w, fmt.Sprintf("part %d is not readable: %v\n", counter, err), http.StatusUnprocessableEntity)
return
}
// check if content type is http
if requestPart.Header.Get("Content-Type") != "application/http" {
http.Error(w, fmt.Sprintf("content type of part %d is not application/http", counter), http.StatusBadRequest)
return
}
// parse the body of the part into a request
req, err := http.ReadRequest(bufio.NewReader(requestPart))
if err != nil {
http.Error(w, fmt.Sprintf("could not parse the sub request of part %d: %v\n", counter, err), http.StatusUnprocessableEntity)
return
}
// add the request to the batch
ops = append(ops, BatchOperation{
ContentID: requestPart.Header.Get("Content-ID"),
Request: req,
})
}
// read the batch results from the channel
// and create new parts for the response
resChan := make(chan BatchResult, len(ops))
wg1 := sync.WaitGroup{}
wg1.Add(1)
go func() {
defer wg1.Done()
for result := range resChan {
// prepare a new response part
responsePart, err := writer.CreatePart(textproto.MIMEHeader{
"Content-Type": {"application/http"},
"Content-ID": {"response-" + result.ContentID},
})
if err != nil {
log.Printf("skipping: could not prepare a response part for subrequest %s %v\n", result.ContentID, err)
return
}
// write the response to the response part
err = result.Response.Write(responsePart)
if err != nil {
log.Printf("skipping: could not write response part for subrequest %s %v\n", result.ContentID, err)
return
}
}
}()
// make all the requests concurrently
wg2 := &sync.WaitGroup{}
for _, op := range ops {
wg2.Add(1)
go func(operation BatchOperation) {
defer wg2.Done()
// record the response
rr := httptest.NewRecorder()
s.Router.ServeHTTP(rr, operation.Request)
response := rr.Result()
response.ContentLength = int64(rr.Body.Len())
// dispatch the response
resChan <- BatchResult{
ContentID: operation.ContentID,
Response: response,
}
}(op)
}
// wait for all requests to finish
wg2.Wait()
// then close the channel
close(resChan)
// and wait for the part writter goroutine to finish
wg1.Wait()
// finally write the multipart response to the client
boundary := writer.Boundary()
w.Header().Set("Content-Type", "multipart/mixed; boundary="+boundary)
_, err = w.Write(body.Bytes())
if err != nil {
log.Printf("could not write multipart response %v\n", err)
}
w.Write([]byte("\r\n--" + boundary + "--\r\n"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment