-
-
Save odeke-em/d5f1690f0343885d4ce780f975576b3b to your computer and use it in GitHub Desktop.
Differences between x/net/http2/transport.go tip@66aacef3dd8a676686c7ae3716979581e8b03c47 vs working version@5961165da77ad3a2abf3a77ea904c13a76b0b073 which are 66 commits apart
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- tr_5961165da77ad3a2abf3a77ea904c13a76b0b073.go 2017-09-19 02:59:26.000000000 -0600 | |
+++ tr_master.go 2017-09-19 03:01:42.000000000 -0600 | |
@@ -18,6 +18,7 @@ | |
"io/ioutil" | |
"log" | |
"math" | |
+ mathrand "math/rand" | |
"net" | |
"net/http" | |
"sort" | |
@@ -86,7 +87,7 @@ | |
// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to | |
// send in the initial settings frame. It is how many bytes | |
- // of response headers are allow. Unlike the http2 spec, zero here | |
+ // of response headers are allowed. Unlike the http2 spec, zero here | |
// means to use a default limit (currently 10MB). If you actually | |
// want to advertise an ulimited value to the peer, Transport | |
// interprets the highest possible value here (0xffffffff or 1<<32-1) | |
@@ -164,15 +165,17 @@ | |
goAwayDebug string // goAway frame's debug data, retained as a string | |
streams map[uint32]*clientStream // client-initiated | |
nextStreamID uint32 | |
+ pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams | |
pings map[[8]byte]chan struct{} // in flight ping data to notification channel | |
bw *bufio.Writer | |
br *bufio.Reader | |
fr *Framer | |
lastActive time.Time | |
// Settings from peer: (also guarded by mu) | |
- maxFrameSize uint32 | |
- maxConcurrentStreams uint32 | |
- initialWindowSize uint32 | |
+ maxFrameSize uint32 | |
+ maxConcurrentStreams uint32 | |
+ peerMaxHeaderListSize uint64 | |
+ initialWindowSize uint32 | |
hbuf bytes.Buffer // HPACK encoder writes into this | |
henc *hpack.Encoder | |
@@ -216,35 +219,45 @@ | |
resTrailer *http.Header // client's Response.Trailer | |
} | |
-// awaitRequestCancel runs in its own goroutine and waits for the user | |
-// to cancel a RoundTrip request, its context to expire, or for the | |
-// request to be done (any way it might be removed from the cc.streams | |
-// map: peer reset, successful completion, TCP connection breakage, | |
-// etc) | |
-func (cs *clientStream) awaitRequestCancel(req *http.Request) { | |
+// awaitRequestCancel waits for the user to cancel a request or for the done | |
+// channel to be signaled. A non-nil error is returned only if the request was | |
+// canceled. | |
+func awaitRequestCancel(req *http.Request, done <-chan struct{}) error { | |
ctx := reqContext(req) | |
if req.Cancel == nil && ctx.Done() == nil { | |
- return | |
+ return nil | |
} | |
select { | |
case <-req.Cancel: | |
- cs.cancelStream() | |
- cs.bufPipe.CloseWithError(errRequestCanceled) | |
+ return errRequestCanceled | |
case <-ctx.Done(): | |
+ return ctx.Err() | |
+ case <-done: | |
+ return nil | |
+ } | |
+} | |
+ | |
+// awaitRequestCancel waits for the user to cancel a request, its context to | |
+// expire, or for the request to be done (any way it might be removed from the | |
+// cc.streams map: peer reset, successful completion, TCP connection breakage, | |
+// etc). If the request is canceled, then cs will be canceled and closed. | |
+func (cs *clientStream) awaitRequestCancel(req *http.Request) { | |
+ if err := awaitRequestCancel(req, cs.done); err != nil { | |
cs.cancelStream() | |
- cs.bufPipe.CloseWithError(ctx.Err()) | |
- case <-cs.done: | |
+ cs.bufPipe.CloseWithError(err) | |
} | |
} | |
func (cs *clientStream) cancelStream() { | |
- cs.cc.mu.Lock() | |
+ cc := cs.cc | |
+ cc.mu.Lock() | |
didReset := cs.didReset | |
cs.didReset = true | |
- cs.cc.mu.Unlock() | |
+ cc.mu.Unlock() | |
if !didReset { | |
- cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
+ cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
+ cc.forgetStreamID(cs.ID) | |
} | |
} | |
@@ -329,7 +342,7 @@ | |
} | |
addr := authorityAddr(req.URL.Scheme, req.URL.Host) | |
- for { | |
+ for retry := 0; ; retry++ { | |
cc, err := t.connPool().GetClientConn(req, addr) | |
if err != nil { | |
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) | |
@@ -337,9 +350,25 @@ | |
} | |
traceGotConn(req, cc) | |
res, err := cc.RoundTrip(req) | |
- if err != nil { | |
- if req, err = shouldRetryRequest(req, err); err == nil { | |
- continue | |
+ if err != nil && retry <= 6 { | |
+ afterBodyWrite := false | |
+ if e, ok := err.(afterReqBodyWriteError); ok { | |
+ err = e | |
+ afterBodyWrite = true | |
+ } | |
+ if req, err = shouldRetryRequest(req, err, afterBodyWrite); err == nil { | |
+ // After the first retry, do exponential backoff with 10% jitter. | |
+ if retry == 0 { | |
+ continue | |
+ } | |
+ backoff := float64(uint(1) << (uint(retry) - 1)) | |
+ backoff += backoff * (0.1 * mathrand.Float64()) | |
+ select { | |
+ case <-time.After(time.Second * time.Duration(backoff)): | |
+ continue | |
+ case <-reqContext(req).Done(): | |
+ return nil, reqContext(req).Err() | |
+ } | |
} | |
} | |
if err != nil { | |
@@ -360,43 +389,60 @@ | |
} | |
var ( | |
- errClientConnClosed = errors.New("http2: client conn is closed") | |
- errClientConnUnusable = errors.New("http2: client conn not usable") | |
- | |
- errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") | |
- errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written") | |
+ errClientConnClosed = errors.New("http2: client conn is closed") | |
+ errClientConnUnusable = errors.New("http2: client conn not usable") | |
+ errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") | |
) | |
+// afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip. | |
+// It is used to signal that err happened after part of Request.Body was sent to the server. | |
+type afterReqBodyWriteError struct { | |
+ err error | |
+} | |
+ | |
+func (e afterReqBodyWriteError) Error() string { | |
+ return e.err.Error() + "; some request body already written" | |
+} | |
+ | |
// shouldRetryRequest is called by RoundTrip when a request fails to get | |
// response headers. It is always called with a non-nil error. | |
// It returns either a request to retry (either the same request, or a | |
// modified clone), or an error if the request can't be replayed. | |
-func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) { | |
- switch err { | |
- default: | |
+func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*http.Request, error) { | |
+ if !canRetryError(err) { | |
return nil, err | |
- case errClientConnUnusable, errClientConnGotGoAway: | |
+ } | |
+ if !afterBodyWrite { | |
return req, nil | |
- case errClientConnGotGoAwayAfterSomeReqBody: | |
- // If the Body is nil (or http.NoBody), it's safe to reuse | |
- // this request and its Body. | |
- if req.Body == nil || reqBodyIsNoBody(req.Body) { | |
- return req, nil | |
- } | |
- // Otherwise we depend on the Request having its GetBody | |
- // func defined. | |
- getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody | |
- if getBody == nil { | |
- return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error") | |
- } | |
- body, err := getBody() | |
- if err != nil { | |
- return nil, err | |
- } | |
- newReq := *req | |
- newReq.Body = body | |
- return &newReq, nil | |
} | |
+ // If the Body is nil (or http.NoBody), it's safe to reuse | |
+ // this request and its Body. | |
+ if req.Body == nil || reqBodyIsNoBody(req.Body) { | |
+ return req, nil | |
+ } | |
+ // Otherwise we depend on the Request having its GetBody | |
+ // func defined. | |
+ getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody | |
+ if getBody == nil { | |
+ return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err) | |
+ } | |
+ body, err := getBody() | |
+ if err != nil { | |
+ return nil, err | |
+ } | |
+ newReq := *req | |
+ newReq.Body = body | |
+ return &newReq, nil | |
+} | |
+ | |
+func canRetryError(err error) bool { | |
+ if err == errClientConnUnusable || err == errClientConnGotGoAway { | |
+ return true | |
+ } | |
+ if se, ok := err.(StreamError); ok { | |
+ return se.Code == ErrCodeRefusedStream | |
+ } | |
+ return false | |
} | |
func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) { | |
@@ -474,17 +520,18 @@ | |
func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) { | |
cc := &ClientConn{ | |
- t: t, | |
- tconn: c, | |
- readerDone: make(chan struct{}), | |
- nextStreamID: 1, | |
- maxFrameSize: 16 << 10, // spec default | |
- initialWindowSize: 65535, // spec default | |
- maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. | |
- streams: make(map[uint32]*clientStream), | |
- singleUse: singleUse, | |
- wantSettingsAck: true, | |
- pings: make(map[[8]byte]chan struct{}), | |
+ t: t, | |
+ tconn: c, | |
+ readerDone: make(chan struct{}), | |
+ nextStreamID: 1, | |
+ maxFrameSize: 16 << 10, // spec default | |
+ initialWindowSize: 65535, // spec default | |
+ maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. | |
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. | |
+ streams: make(map[uint32]*clientStream), | |
+ singleUse: singleUse, | |
+ wantSettingsAck: true, | |
+ pings: make(map[[8]byte]chan struct{}), | |
} | |
if d := t.idleConnTimeout(); d != 0 { | |
cc.idleTimeout = d | |
@@ -560,6 +607,8 @@ | |
} | |
} | |
+// CanTakeNewRequest reports whether the connection can take a new request, | |
+// meaning it has not been closed or received or sent a GOAWAY. | |
func (cc *ClientConn) CanTakeNewRequest() bool { | |
cc.mu.Lock() | |
defer cc.mu.Unlock() | |
@@ -571,8 +620,7 @@ | |
return false | |
} | |
return cc.goAway == nil && !cc.closed && | |
- int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && | |
- cc.nextStreamID < math.MaxInt32 | |
+ int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32 | |
} | |
// onIdleTimeout is called from a time.AfterFunc goroutine. It will | |
@@ -694,7 +742,7 @@ | |
// req.ContentLength, where 0 actually means zero (not unknown) and -1 | |
// means unknown. | |
func actualContentLength(req *http.Request) int64 { | |
- if req.Body == nil { | |
+ if req.Body == nil || reqBodyIsNoBody(req.Body) { | |
return 0 | |
} | |
if req.ContentLength != 0 { | |
@@ -718,15 +766,14 @@ | |
hasTrailers := trailers != "" | |
cc.mu.Lock() | |
- cc.lastActive = time.Now() | |
- if cc.closed || !cc.canTakeNewRequestLocked() { | |
+ if err := cc.awaitOpenSlotForRequest(req); err != nil { | |
cc.mu.Unlock() | |
- return nil, errClientConnUnusable | |
+ return nil, err | |
} | |
body := req.Body | |
- hasBody := body != nil | |
contentLen := actualContentLength(req) | |
+ hasBody := contentLen != 0 | |
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? | |
var requestedGzip bool | |
@@ -816,14 +863,13 @@ | |
cs.abortRequestBodyWrite(errStopReqBodyWrite) | |
} | |
if re.err != nil { | |
- if re.err == errClientConnGotGoAway { | |
- cc.mu.Lock() | |
- if cs.startedWrite { | |
- re.err = errClientConnGotGoAwayAfterSomeReqBody | |
- } | |
- cc.mu.Unlock() | |
- } | |
+ cc.mu.Lock() | |
+ afterBodyWrite := cs.startedWrite | |
+ cc.mu.Unlock() | |
cc.forgetStreamID(cs.ID) | |
+ if afterBodyWrite { | |
+ return nil, afterReqBodyWriteError{re.err} | |
+ } | |
return nil, re.err | |
} | |
res.Request = req | |
@@ -836,31 +882,31 @@ | |
case re := <-readLoopResCh: | |
return handleReadLoopResponse(re) | |
case <-respHeaderTimer: | |
- cc.forgetStreamID(cs.ID) | |
if !hasBody || bodyWritten { | |
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
} else { | |
bodyWriter.cancel() | |
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
} | |
+ cc.forgetStreamID(cs.ID) | |
return nil, errTimeout | |
case <-ctx.Done(): | |
- cc.forgetStreamID(cs.ID) | |
if !hasBody || bodyWritten { | |
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
} else { | |
bodyWriter.cancel() | |
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
} | |
+ cc.forgetStreamID(cs.ID) | |
return nil, ctx.Err() | |
case <-req.Cancel: | |
- cc.forgetStreamID(cs.ID) | |
if !hasBody || bodyWritten { | |
cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
} else { | |
bodyWriter.cancel() | |
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
} | |
+ cc.forgetStreamID(cs.ID) | |
return nil, errRequestCanceled | |
case <-cs.peerReset: | |
// processResetStream already removed the | |
@@ -887,6 +933,45 @@ | |
} | |
} | |
+// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams. | |
+// Must hold cc.mu. | |
+func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error { | |
+ var waitingForConn chan struct{} | |
+ var waitingForConnErr error // guarded by cc.mu | |
+ for { | |
+ cc.lastActive = time.Now() | |
+ if cc.closed || !cc.canTakeNewRequestLocked() { | |
+ return errClientConnUnusable | |
+ } | |
+ if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { | |
+ if waitingForConn != nil { | |
+ close(waitingForConn) | |
+ } | |
+ return nil | |
+ } | |
+ // Unfortunately, we cannot wait on a condition variable and channel at | |
+ // the same time, so instead, we spin up a goroutine to check if the | |
+ // request is canceled while we wait for a slot to open in the connection. | |
+ if waitingForConn == nil { | |
+ waitingForConn = make(chan struct{}) | |
+ go func() { | |
+ if err := awaitRequestCancel(req, waitingForConn); err != nil { | |
+ cc.mu.Lock() | |
+ waitingForConnErr = err | |
+ cc.cond.Broadcast() | |
+ cc.mu.Unlock() | |
+ } | |
+ }() | |
+ } | |
+ cc.pendingRequests++ | |
+ cc.cond.Wait() | |
+ cc.pendingRequests-- | |
+ if waitingForConnErr != nil { | |
+ return waitingForConnErr | |
+ } | |
+ } | |
+} | |
+ | |
// requires cc.wmu be held | |
func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error { | |
first := true // first frame written (HEADERS is first, then CONTINUATION) | |
@@ -1002,8 +1087,13 @@ | |
var trls []byte | |
if hasTrailers { | |
cc.mu.Lock() | |
- defer cc.mu.Unlock() | |
- trls = cc.encodeTrailers(req) | |
+ trls, err = cc.encodeTrailers(req) | |
+ cc.mu.Unlock() | |
+ if err != nil { | |
+ cc.writeStreamReset(cs.ID, ErrCodeInternal, err) | |
+ cc.forgetStreamID(cs.ID) | |
+ return err | |
+ } | |
} | |
cc.wmu.Lock() | |
@@ -1106,62 +1196,86 @@ | |
} | |
} | |
- // 8.1.2.3 Request Pseudo-Header Fields | |
- // The :path pseudo-header field includes the path and query parts of the | |
- // target URI (the path-absolute production and optionally a '?' character | |
- // followed by the query production (see Sections 3.3 and 3.4 of | |
- // [RFC3986]). | |
- cc.writeHeader(":authority", host) | |
- cc.writeHeader(":method", req.Method) | |
- if req.Method != "CONNECT" { | |
- cc.writeHeader(":path", path) | |
- cc.writeHeader(":scheme", req.URL.Scheme) | |
- } | |
- if trailers != "" { | |
- cc.writeHeader("trailer", trailers) | |
- } | |
- | |
- var didUA bool | |
- for k, vv := range req.Header { | |
- lowKey := strings.ToLower(k) | |
- switch lowKey { | |
- case "host", "content-length": | |
- // Host is :authority, already sent. | |
- // Content-Length is automatic, set below. | |
- continue | |
- case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive": | |
- // Per 8.1.2.2 Connection-Specific Header | |
- // Fields, don't send connection-specific | |
- // fields. We have already checked if any | |
- // are error-worthy so just ignore the rest. | |
- continue | |
- case "user-agent": | |
- // Match Go's http1 behavior: at most one | |
- // User-Agent. If set to nil or empty string, | |
- // then omit it. Otherwise if not mentioned, | |
- // include the default (below). | |
- didUA = true | |
- if len(vv) < 1 { | |
+ enumerateHeaders := func(f func(name, value string)) { | |
+ // 8.1.2.3 Request Pseudo-Header Fields | |
+ // The :path pseudo-header field includes the path and query parts of the | |
+ // target URI (the path-absolute production and optionally a '?' character | |
+ // followed by the query production (see Sections 3.3 and 3.4 of | |
+ // [RFC3986]). | |
+ f(":authority", host) | |
+ f(":method", req.Method) | |
+ if req.Method != "CONNECT" { | |
+ f(":path", path) | |
+ f(":scheme", req.URL.Scheme) | |
+ } | |
+ if trailers != "" { | |
+ f("trailer", trailers) | |
+ } | |
+ | |
+ var didUA bool | |
+ for k, vv := range req.Header { | |
+ if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") { | |
+ // Host is :authority, already sent. | |
+ // Content-Length is automatic, set below. | |
continue | |
- } | |
- vv = vv[:1] | |
- if vv[0] == "" { | |
+ } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") || | |
+ strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") || | |
+ strings.EqualFold(k, "keep-alive") { | |
+ // Per 8.1.2.2 Connection-Specific Header | |
+ // Fields, don't send connection-specific | |
+ // fields. We have already checked if any | |
+ // are error-worthy so just ignore the rest. | |
continue | |
+ } else if strings.EqualFold(k, "user-agent") { | |
+ // Match Go's http1 behavior: at most one | |
+ // User-Agent. If set to nil or empty string, | |
+ // then omit it. Otherwise if not mentioned, | |
+ // include the default (below). | |
+ didUA = true | |
+ if len(vv) < 1 { | |
+ continue | |
+ } | |
+ vv = vv[:1] | |
+ if vv[0] == "" { | |
+ continue | |
+ } | |
+ | |
+ } | |
+ | |
+ for _, v := range vv { | |
+ f(k, v) | |
} | |
} | |
- for _, v := range vv { | |
- cc.writeHeader(lowKey, v) | |
+ if shouldSendReqContentLength(req.Method, contentLength) { | |
+ f("content-length", strconv.FormatInt(contentLength, 10)) | |
+ } | |
+ if addGzipHeader { | |
+ f("accept-encoding", "gzip") | |
+ } | |
+ if !didUA { | |
+ f("user-agent", defaultUserAgent) | |
} | |
} | |
- if shouldSendReqContentLength(req.Method, contentLength) { | |
- cc.writeHeader("content-length", strconv.FormatInt(contentLength, 10)) | |
- } | |
- if addGzipHeader { | |
- cc.writeHeader("accept-encoding", "gzip") | |
- } | |
- if !didUA { | |
- cc.writeHeader("user-agent", defaultUserAgent) | |
+ | |
+ // Do a first pass over the headers counting bytes to ensure | |
+ // we don't exceed cc.peerMaxHeaderListSize. This is done as a | |
+ // separate pass before encoding the headers to prevent | |
+ // modifying the hpack state. | |
+ hlSize := uint64(0) | |
+ enumerateHeaders(func(name, value string) { | |
+ hf := hpack.HeaderField{Name: name, Value: value} | |
+ hlSize += uint64(hf.Size()) | |
+ }) | |
+ | |
+ if hlSize > cc.peerMaxHeaderListSize { | |
+ return nil, errRequestHeaderListSize | |
} | |
+ | |
+ // Header list size is ok. Write the headers. | |
+ enumerateHeaders(func(name, value string) { | |
+ cc.writeHeader(strings.ToLower(name), value) | |
+ }) | |
+ | |
return cc.hbuf.Bytes(), nil | |
} | |
@@ -1188,17 +1302,29 @@ | |
} | |
// requires cc.mu be held. | |
-func (cc *ClientConn) encodeTrailers(req *http.Request) []byte { | |
+func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) { | |
cc.hbuf.Reset() | |
+ | |
+ hlSize := uint64(0) | |
+ for k, vv := range req.Trailer { | |
+ for _, v := range vv { | |
+ hf := hpack.HeaderField{Name: k, Value: v} | |
+ hlSize += uint64(hf.Size()) | |
+ } | |
+ } | |
+ if hlSize > cc.peerMaxHeaderListSize { | |
+ return nil, errRequestHeaderListSize | |
+ } | |
+ | |
for k, vv := range req.Trailer { | |
- // Transfer-Encoding, etc.. have already been filter at the | |
+ // Transfer-Encoding, etc.. have already been filtered at the | |
// start of RoundTrip | |
lowKey := strings.ToLower(k) | |
for _, v := range vv { | |
cc.writeHeader(lowKey, v) | |
} | |
} | |
- return cc.hbuf.Bytes() | |
+ return cc.hbuf.Bytes(), nil | |
} | |
func (cc *ClientConn) writeHeader(name, value string) { | |
@@ -1246,7 +1372,9 @@ | |
cc.idleTimer.Reset(cc.idleTimeout) | |
} | |
close(cs.done) | |
- cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl | |
+ // Wake up checkResetOrDone via clientStream.awaitFlowControl and | |
+ // wake up RoundTrip if there is a pending request. | |
+ cc.cond.Broadcast() | |
} | |
return cs | |
} | |
@@ -1345,8 +1473,9 @@ | |
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) | |
} | |
if se, ok := err.(StreamError); ok { | |
- if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil { | |
+ if cs := cc.streamByID(se.StreamID, false); cs != nil { | |
cs.cc.writeStreamReset(cs.ID, se.Code, err) | |
+ cs.cc.forgetStreamID(cs.ID) | |
if se.Cause == nil { | |
se.Cause = cc.fr.errDetail | |
} | |
@@ -1655,6 +1784,7 @@ | |
cc.wmu.Lock() | |
if !serverSentStreamEnd { | |
cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel) | |
+ cs.didReset = true | |
} | |
// Return connection-level flow control. | |
if unread > 0 { | |
@@ -1667,6 +1797,7 @@ | |
} | |
cs.bufPipe.BreakWithError(errClosedResponseBody) | |
+ cc.forgetStreamID(cs.ID) | |
return nil | |
} | |
@@ -1701,13 +1832,15 @@ | |
} | |
return nil | |
} | |
+ if !cs.firstByte { | |
+ cc.logf("protocol error: received DATA before a HEADERS frame") | |
+ rl.endStreamError(cs, StreamError{ | |
+ StreamID: f.StreamID, | |
+ Code: ErrCodeProtocol, | |
+ }) | |
+ return nil | |
+ } | |
if f.Length > 0 { | |
- if len(data) > 0 && cs.bufPipe.b == nil { | |
- // Data frame after it's already closed? | |
- cc.logf("http2: Transport received DATA frame for closed stream; closing connection") | |
- return ConnectionError(ErrCodeProtocol) | |
- } | |
- | |
// Check connection-level flow control. | |
cc.mu.Lock() | |
if cs.inflow.available() >= int32(f.Length) { | |
@@ -1718,16 +1851,27 @@ | |
} | |
// Return any padded flow control now, since we won't | |
// refund it later on body reads. | |
- if pad := int32(f.Length) - int32(len(data)); pad > 0 { | |
- cs.inflow.add(pad) | |
- cc.inflow.add(pad) | |
+ var refund int | |
+ if pad := int(f.Length) - len(data); pad > 0 { | |
+ refund += pad | |
+ } | |
+ // Return len(data) now if the stream is already closed, | |
+ // since data will never be read. | |
+ didReset := cs.didReset | |
+ if didReset { | |
+ refund += len(data) | |
+ } | |
+ if refund > 0 { | |
+ cc.inflow.add(int32(refund)) | |
cc.wmu.Lock() | |
- cc.fr.WriteWindowUpdate(0, uint32(pad)) | |
- cc.fr.WriteWindowUpdate(cs.ID, uint32(pad)) | |
+ cc.fr.WriteWindowUpdate(0, uint32(refund)) | |
+ if !didReset { | |
+ cs.inflow.add(int32(refund)) | |
+ cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) | |
+ } | |
cc.bw.Flush() | |
cc.wmu.Unlock() | |
} | |
- didReset := cs.didReset | |
cc.mu.Unlock() | |
if len(data) > 0 && !didReset { | |
@@ -1810,6 +1954,8 @@ | |
cc.maxFrameSize = s.Val | |
case SettingMaxConcurrentStreams: | |
cc.maxConcurrentStreams = s.Val | |
+ case SettingMaxHeaderListSize: | |
+ cc.peerMaxHeaderListSize = uint64(s.Val) | |
case SettingInitialWindowSize: | |
// Values above the maximum flow-control | |
// window size of 2^31-1 MUST be treated as a | |
@@ -1976,6 +2122,7 @@ | |
var ( | |
errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit") | |
+ errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit") | |
errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers") | |
) | |
@@ -2125,4 +2272,4 @@ | |
// connection for a single request and then close the connection. | |
func isConnectionCloseRequest(req *http.Request) bool { | |
return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close") | |
-} | |
+} | |
\ No newline at end of file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment