Skip to content

Instantly share code, notes, and snippets.

@PaulFurtado
Created February 17, 2021 03:56
Show Gist options
  • Save PaulFurtado/763700624dfcb75fdcccb1a305794fb5 to your computer and use it in GitHub Desktop.
Save PaulFurtado/763700624dfcb75fdcccb1a305794fb5 to your computer and use it in GitHub Desktop.
diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go
index c24fbc6921c..e499f2a18e0 100644
--- a/staging/src/k8s.io/apimachinery/pkg/util/net/http.go
+++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http.go
@@ -30,6 +30,7 @@ import (
"path"
"strconv"
"strings"
+ "time"
"golang.org/x/net/http2"
"k8s.io/klog"
@@ -121,13 +122,61 @@ func SetTransportDefaults(t *http.Transport) *http.Transport {
if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
klog.Infof("HTTP2 has been explicitly disabled")
} else if allowsHTTP2(t) {
- if err := http2.ConfigureTransport(t); err != nil {
+ if err := configureHTTP2Transport(t); err != nil {
klog.Warningf("Transport failed http2 configuration: %v", err)
}
}
return t
}
+func readIdleTimeoutSeconds() int {
+ ret := 30
+ // User can set the readIdleTimeout to 0 to disable the HTTP/2
+ // connection health check.
+ if s := os.Getenv("HTTP2_READ_IDLE_TIMEOUT_SECONDS"); len(s) > 0 {
+ i, err := strconv.Atoi(s)
+ if err != nil {
+ klog.Warningf("Illegal HTTP2_READ_IDLE_TIMEOUT_SECONDS(%q): %v."+
+ " Default value %d is used", s, err, ret)
+ return ret
+ }
+ ret = i
+ }
+ return ret
+}
+
+func pingTimeoutSeconds() int {
+ ret := 15
+ if s := os.Getenv("HTTP2_PING_TIMEOUT_SECONDS"); len(s) > 0 {
+ i, err := strconv.Atoi(s)
+ if err != nil {
+ klog.Warningf("Illegal HTTP2_PING_TIMEOUT_SECONDS(%q): %v."+
+ " Default value %d is used", s, err, ret)
+ return ret
+ }
+ ret = i
+ }
+ return ret
+}
+
+func configureHTTP2Transport(t *http.Transport) error {
+ t2, err := http2.ConfigureTransports(t)
+ if err != nil {
+ return err
+ }
+ // The following enables the HTTP/2 connection health check added in
+ // https://github.com/golang/net/pull/55. The health check detects and
+ // closes broken transport layer connections. Without the health check,
+ // a broken connection can linger too long, e.g., a broken TCP
+ // connection will be closed by the Linux kernel after 13 to 30 minutes
+ // by default, which caused
+ // https://github.com/kubernetes/client-go/issues/374 and
+ // https://github.com/kubernetes/kubernetes/issues/87615.
+ t2.ReadIdleTimeout = time.Duration(readIdleTimeoutSeconds()) * time.Second
+ t2.PingTimeout = time.Duration(pingTimeoutSeconds()) * time.Second
+ return nil
+}
+
func allowsHTTP2(t *http.Transport) bool {
if t.TLSClientConfig == nil || len(t.TLSClientConfig.NextProtos) == 0 {
// the transport expressed no NextProto preference, allow
diff --git a/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go b/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go
index 142b80f1a84..c057b99530e 100644
--- a/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go
+++ b/staging/src/k8s.io/apimachinery/pkg/util/net/http_test.go
@@ -492,3 +492,39 @@ func TestAllowsHTTP2(t *testing.T) {
})
}
}
+
+func setEnv(key, value string) func() {
+ originalValue := os.Getenv(key)
+ os.Setenv(key, value)
+ return func() {
+ os.Setenv(key, originalValue)
+ }
+}
+
+func TestReadIdleTimeoutSeconds(t *testing.T) {
+ reset := setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", "60")
+ if e, a := 60, readIdleTimeoutSeconds(); e != a {
+ t.Errorf("expected %d, got %d", e, a)
+ }
+ reset()
+
+ reset = setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", "illegal value")
+ if e, a := 30, readIdleTimeoutSeconds(); e != a {
+ t.Errorf("expected %d, got %d", e, a)
+ }
+ reset()
+}
+
+func TestPingTimeoutSeconds(t *testing.T) {
+ reset := setEnv("HTTP2_PING_TIMEOUT_SECONDS", "60")
+ if e, a := 60, pingTimeoutSeconds(); e != a {
+ t.Errorf("expected %d, got %d", e, a)
+ }
+ reset()
+
+ reset = setEnv("HTTP2_PING_TIMEOUT_SECONDS", "illegal value")
+ if e, a := 15, pingTimeoutSeconds(); e != a {
+ t.Errorf("expected %d, got %d", e, a)
+ }
+ reset()
+}
diff --git a/staging/src/k8s.io/client-go/rest/BUILD b/staging/src/k8s.io/client-go/rest/BUILD
index 61f77e28eca..1f37ef5867d 100644
--- a/staging/src/k8s.io/client-go/rest/BUILD
+++ b/staging/src/k8s.io/client-go/rest/BUILD
@@ -11,6 +11,7 @@ go_test(
srcs = [
"client_test.go",
"config_test.go",
+ "connection_test.go",
"plugin_test.go",
"request_test.go",
"url_utils_test.go",
@@ -25,12 +26,14 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
+ "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/httpstream:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
+ "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/rest/watch:go_default_library",
diff --git a/staging/src/k8s.io/client-go/rest/connection_test.go b/staging/src/k8s.io/client-go/rest/connection_test.go
new file mode 100644
index 00000000000..f6695b8cb14
--- /dev/null
+++ b/staging/src/k8s.io/client-go/rest/connection_test.go
@@ -0,0 +1,161 @@
+/*
+Copyright 2019 The Kubernetes Authors.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rest
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/runtime/serializer"
+ utilnet "k8s.io/apimachinery/pkg/util/net"
+)
+
+type tcpLB struct {
+ t *testing.T
+ ln net.Listener
+ serverURL string
+ dials int32
+}
+
+func (lb *tcpLB) handleConnection(in net.Conn, stopCh chan struct{}) {
+ out, err := net.Dial("tcp", lb.serverURL)
+ if err != nil {
+ lb.t.Log(err)
+ return
+ }
+ go io.Copy(out, in)
+ go io.Copy(in, out)
+ <-stopCh
+ if err := out.Close(); err != nil {
+ lb.t.Fatalf("failed to close connection: %v", err)
+ }
+}
+
+func (lb *tcpLB) serve(stopCh chan struct{}) {
+ conn, err := lb.ln.Accept()
+ if err != nil {
+ lb.t.Fatalf("failed to accept: %v", err)
+ }
+ atomic.AddInt32(&lb.dials, 1)
+ go lb.handleConnection(conn, stopCh)
+}
+
+func newLB(t *testing.T, serverURL string) *tcpLB {
+ ln, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatalf("failed to bind: %v", err)
+ }
+ lb := tcpLB{
+ serverURL: serverURL,
+ ln: ln,
+ t: t,
+ }
+ return &lb
+}
+
+func setEnv(key, value string) func() {
+ originalValue := os.Getenv(key)
+ os.Setenv(key, value)
+ return func() {
+ os.Setenv(key, originalValue)
+ }
+}
+
+const (
+ readIdleTimeout int = 1
+ pingTimeout int = 1
+)
+
+func TestReconnectBrokenTCP(t *testing.T) {
+ defer setEnv("HTTP2_READ_IDLE_TIMEOUT_SECONDS", strconv.Itoa(readIdleTimeout))()
+ defer setEnv("HTTP2_PING_TIMEOUT_SECONDS", strconv.Itoa(pingTimeout))()
+ defer setEnv("DISABLE_HTTP2", "")()
+ ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "Hello, %s", r.Proto)
+ }))
+ ts.EnableHTTP2 = true
+ ts.StartTLS()
+ defer ts.Close()
+
+ u, err := url.Parse(ts.URL)
+ if err != nil {
+ t.Fatalf("failed to parse URL from %q: %v", ts.URL, err)
+ }
+ lb := newLB(t, u.Host)
+ defer lb.ln.Close()
+ stopCh := make(chan struct{})
+ go lb.serve(stopCh)
+ transport, ok := ts.Client().Transport.(*http.Transport)
+ if !ok {
+ t.Fatalf("failed to assert *http.Transport")
+ }
+ config := &Config{
+ Host: "https://" + lb.ln.Addr().String(),
+ Transport: utilnet.SetTransportDefaults(transport),
+ Timeout: 1 * time.Second,
+ // These fields are required to create a REST client.
+ ContentConfig: ContentConfig{
+ GroupVersion: &schema.GroupVersion{},
+ NegotiatedSerializer: &serializer.CodecFactory{},
+ },
+ }
+ client, err := RESTClientFor(config)
+ if err != nil {
+ t.Fatalf("failed to create REST client: %v", err)
+ }
+ data, err := client.Get().AbsPath("/").DoRaw(context.TODO())
+ if err != nil {
+ t.Fatalf("unexpected err: %s: %v", data, err)
+ }
+ if string(data) != "Hello, HTTP/2.0" {
+ t.Fatalf("unexpected response: %s", data)
+ }
+
+ // Deliberately let the LB stop proxying traffic for the current
+ // connection. This mimics a broken TCP connection that's not properly
+ // closed.
+ close(stopCh)
+
+ stopCh = make(chan struct{})
+ go lb.serve(stopCh)
+ // Sleep enough time for the HTTP/2 health check to detect and close
+ // the broken TCP connection.
+ time.Sleep(time.Duration(1+readIdleTimeout+pingTimeout) * time.Second)
+ // If the HTTP/2 health check were disabled, the broken connection
+ // would still be in the connection pool, the following request would
+ // then reuse the broken connection instead of creating a new one, and
+ // thus would fail.
+ data, err = client.Get().AbsPath("/").DoRaw(context.TODO())
+ if err != nil {
+ t.Fatalf("unexpected err: %v", err)
+ }
+ if string(data) != "Hello, HTTP/2.0" {
+ t.Fatalf("unexpected response: %s", data)
+ }
+ dials := atomic.LoadInt32(&lb.dials)
+ if dials != 2 {
+ t.Fatalf("expected %d dials, got %d", 2, dials)
+ }
+}
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go
index aeac7d8a51a..d0a35d53fd4 100644
--- a/vendor/golang.org/x/net/http2/transport.go
+++ b/vendor/golang.org/x/net/http2/transport.go
@@ -108,6 +108,19 @@ type Transport struct {
// waiting for their turn.
StrictMaxConcurrentStreams bool
+ // ReadIdleTimeout is the timeout after which a health check using ping
+ // frame will be carried out if no frame is received on the connection.
+ // Note that a ping response will is considered a received frame, so if
+ // there is no other traffic on the connection, the health check will
+ // be performed every ReadIdleTimeout interval.
+ // If zero, no health check is performed.
+ ReadIdleTimeout time.Duration
+
+ // PingTimeout is the timeout after which the connection will be closed
+ // if a response to Ping is not received.
+ // Defaults to 15s.
+ PingTimeout time.Duration
+
// t1, if non-nil, is the standard library Transport using
// this transport. Its settings are used (but not its
// RoundTrip method, etc).
@@ -131,14 +144,31 @@ func (t *Transport) disableCompression() bool {
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
}
+func (t *Transport) pingTimeout() time.Duration {
+ if t.PingTimeout == 0 {
+ return 15 * time.Second
+ }
+ return t.PingTimeout
+
+}
+
// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
// It returns an error if t1 has already been HTTP/2-enabled.
+//
+// Use ConfigureTransports instead to configure the HTTP/2 Transport.
func ConfigureTransport(t1 *http.Transport) error {
- _, err := configureTransport(t1)
+ _, err := ConfigureTransports(t1)
return err
}
-func configureTransport(t1 *http.Transport) (*Transport, error) {
+// ConfigureTransports configures a net/http HTTP/1 Transport to use HTTP/2.
+// It returns a new HTTP/2 Transport for further configuration.
+// It returns an error if t1 has already been HTTP/2-enabled.
+func ConfigureTransports(t1 *http.Transport) (*Transport, error) {
+ return configureTransports(t1)
+}
+
+func configureTransports(t1 *http.Transport) (*Transport, error) {
connPool := new(clientConnPool)
t2 := &Transport{
ConnPool: noDialClientConnPool{connPool},
@@ -667,6 +697,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
cc.bw.Flush()
if cc.werr != nil {
+ cc.Close()
return nil, cc.werr
}
@@ -674,6 +705,20 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
return cc, nil
}
+func (cc *ClientConn) healthCheck() {
+ pingTimeout := cc.t.pingTimeout()
+ // We don't need to periodically ping in the health check, because the readLoop of ClientConn will
+ // trigger the healthCheck again if there is no frame received.
+ ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
+ defer cancel()
+ err := cc.Ping(ctx)
+ if err != nil {
+ cc.closeForLostPing()
+ cc.t.connPool().MarkDead(cc)
+ return
+ }
+}
+
func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -834,14 +879,12 @@ func (cc *ClientConn) sendGoAway() error {
return nil
}
-// Close closes the client connection immediately.
-//
-// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
-func (cc *ClientConn) Close() error {
+// closes the client connection immediately. In-flight requests are interrupted.
+// err is sent to streams.
+func (cc *ClientConn) closeForError(err error) error {
cc.mu.Lock()
defer cc.cond.Broadcast()
defer cc.mu.Unlock()
- err := errors.New("http2: client connection force closed via ClientConn.Close")
for id, cs := range cc.streams {
select {
case cs.resc <- resAndError{err: err}:
@@ -854,6 +897,20 @@ func (cc *ClientConn) Close() error {
return cc.tconn.Close()
}
+// Close closes the client connection immediately.
+//
+// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
+func (cc *ClientConn) Close() error {
+ err := errors.New("http2: client connection force closed via ClientConn.Close")
+ return cc.closeForError(err)
+}
+
+// closes the client connection immediately. In-flight requests are interrupted.
+func (cc *ClientConn) closeForLostPing() error {
+ err := errors.New("http2: client connection lost")
+ return cc.closeForError(err)
+}
+
const maxAllocFrameSize = 512 << 10
// frameBuffer returns a scratch buffer suitable for writing DATA frames.
@@ -1021,6 +1078,15 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
bodyWriter := cc.t.getBodyWriterState(cs, body)
cs.on100 = bodyWriter.on100
+ defer func() {
+ cc.wmu.Lock()
+ werr := cc.werr
+ cc.wmu.Unlock()
+ if werr != nil {
+ cc.Close()
+ }
+ }()
+
cc.wmu.Lock()
endStream := !hasBody && !hasTrailers
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
@@ -1682,8 +1748,17 @@ func (rl *clientConnReadLoop) run() error {
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
+ readIdleTimeout := cc.t.ReadIdleTimeout
+ var t *time.Timer
+ if readIdleTimeout != 0 {
+ t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
+ defer t.Stop()
+ }
for {
f, err := cc.fr.ReadFrame()
+ if t != nil {
+ t.Reset(readIdleTimeout)
+ }
if err != nil {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment