Skip to content

Instantly share code, notes, and snippets.

@ncdc
Created August 26, 2015 17:13
Show Gist options
  • Save ncdc/22d5692b61d0130eed07 to your computer and use it in GitHub Desktop.
Save ncdc/22d5692b61d0130eed07 to your computer and use it in GitHub Desktop.
commit 7f0637c4b5703b8d2df0ac535f69addb5703820f
Author: Andy Goldstein <agoldste@redhat.com>
Date: Wed Aug 5 12:46:11 2015 -0400
Correct port-forward data copying logic
Correct port-forward data copying logic so that the server closes its
half of the data stream when socat exits, and the client closes its half
of the data stream when it finishes writing.
Modify the client to wait for both copies (client->server,
server->client) to finish before it unblocks.
diff --git a/contrib/for-tests/port-forward-tester/.gitignore b/contrib/for-tests/port-forward-tester/.gitignore
new file mode 100644
index 0000000..eb23f7e
--- /dev/null
+++ b/contrib/for-tests/port-forward-tester/.gitignore
@@ -0,0 +1 @@
+pftester
diff --git a/contrib/for-tests/port-forward-tester/Dockerfile b/contrib/for-tests/port-forward-tester/Dockerfile
new file mode 100644
index 0000000..239cf57
--- /dev/null
+++ b/contrib/for-tests/port-forward-tester/Dockerfile
@@ -0,0 +1,17 @@
+# Copyright 2015 The Kubernetes Authors All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM scratch
+ADD pftester pftester
+ENTRYPOINT ["/pftester"]
diff --git a/contrib/for-tests/port-forward-tester/Makefile b/contrib/for-tests/port-forward-tester/Makefile
new file mode 100644
index 0000000..aa479d5
--- /dev/null
+++ b/contrib/for-tests/port-forward-tester/Makefile
@@ -0,0 +1,15 @@
+all: push
+
+TAG = 0.1
+
+pftester: pftester.go
+ CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-w' ./pftester.go
+
+image: pftester
+ sudo docker build -t ncdc/pftester:$(TAG) .
+
+push: image
+ sudo docker push ncdc/pftester:$(TAG)
+
+clean:
+ rm -f pftester
diff --git a/contrib/for-tests/port-forward-tester/pftester.go b/contrib/for-tests/port-forward-tester/pftester.go
new file mode 100644
index 0000000..ef8be40
--- /dev/null
+++ b/contrib/for-tests/port-forward-tester/pftester.go
@@ -0,0 +1,93 @@
+/*
+Copyright 2015 The Kubernetes Authors All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "fmt"
+ "net"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+func getEnvInt(name string) int {
+ s := os.Getenv(name)
+ value, err := strconv.Atoi(s)
+ if err != nil {
+ fmt.Printf("Error parsing %s %q: %v\n", name, s, err)
+ os.Exit(1)
+ }
+ return value
+}
+
+func main() {
+ bindPort := os.Getenv("BIND_PORT")
+ listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", bindPort))
+ if err != nil {
+ fmt.Printf("Error listening: %v\n", err)
+ os.Exit(1)
+ }
+
+ conn, err := listener.Accept()
+ if err != nil {
+ fmt.Printf("Error accepting connection: %v\n", err)
+ os.Exit(1)
+ }
+ defer conn.Close()
+ fmt.Println("Accepted client connection")
+
+ expectedClientData := os.Getenv("EXPECTED_CLIENT_DATA")
+ if len(expectedClientData) > 0 {
+ buf := make([]byte, len(expectedClientData))
+ read, err := conn.Read(buf)
+ if read != len(expectedClientData) {
+ fmt.Printf("Expected to read %d bytes from client, but got %d instead. err=%v\n", len(expectedClientData), read, err)
+ os.Exit(2)
+ }
+ if expectedClientData != string(buf) {
+ fmt.Printf("Expect to read %q, but got %q. err=%v\n", expectedClientData, string(buf), err)
+ os.Exit(3)
+ }
+ if err != nil {
+ fmt.Printf("Read err: %v\n", err)
+ }
+ }
+
+ chunks := getEnvInt("CHUNKS")
+ chunkSize := getEnvInt("CHUNK_SIZE")
+ chunkInterval := getEnvInt("CHUNK_INTERVAL")
+
+ stringData := strings.Repeat("x", chunkSize)
+ data := []byte(stringData)
+
+ for i := 0; i < chunks; i++ {
+ written, err := conn.Write(data)
+ if written != chunkSize {
+ fmt.Printf("Expected to write %d bytes from client, but wrote %d instead. err=%v\n", chunkSize, written, err)
+ os.Exit(4)
+ }
+ if err != nil {
+ fmt.Printf("Write err: %v\n", err)
+ }
+ if i+1 < chunks {
+ time.Sleep(time.Duration(chunkInterval) * time.Second)
+ }
+ }
+
+ fmt.Println("Done")
+}
diff --git a/pkg/client/portforward/portforward.go b/pkg/client/portforward/portforward.go
index a7dfe15..6efd163 100644
--- a/pkg/client/portforward/portforward.go
+++ b/pkg/client/portforward/portforward.go
@@ -285,26 +285,38 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer dataStream.Reset()
go func() {
- // Copy from the remote side to the local port. We won't get an EOF from
- // the server as it has no way of knowing when to close the stream. We'll
- // take care of closing both ends of the stream with the call to
- // stream.Reset() when this function exits.
- if _, err := io.Copy(conn, dataStream); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
+ // Copy from the remote side to the local port.
+ _, err := io.Copy(conn, dataStream)
+ if err != nil {
glog.Errorf("Error copying from remote stream to local connection: %v", err)
}
+
+ // inform the select below that the remote copy is done
doneChan <- struct{}{}
}()
go func() {
- // Copy from the local port to the remote side. Here we will be able to know
- // when the Copy gets an EOF from conn, as that will happen as soon as conn is
- // closed (i.e. client disconnected).
- if _, err := io.Copy(dataStream, conn); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
- glog.Errorf("Error copying from local connection to remote stream: %v", err)
+ // Copy from the local port to the remote side.
+ _, err := io.Copy(dataStream, conn)
+
+ // inform server we're not sending any more data
+ dataStream.Close()
+
+ if err == nil {
+ // clean copy - wait for server to finish
+ return
}
+
+ glog.Errorf("Error copying from local connection to remote stream: %v", err)
+
+ // shutdown the stream in both directions
+ dataStream.Reset()
+
+ // break out of the select below
doneChan <- struct{}{}
}()
+ // wait for either an error or for copying from the remote side to finish
select {
case err := <-errorChan:
glog.Error(err)
diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go
index b845191..06eb2e8 100644
--- a/pkg/kubelet/dockertools/manager.go
+++ b/pkg/kubelet/dockertools/manager.go
@@ -1076,6 +1076,10 @@ func noPodInfraContainerError(podName, podNamespace string) error {
// - should we support nsenter + socat on the host? (current impl)
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
+ // We need to close our half of the stream when socat exits so the client
+ // knows we aren't going to send any more data
+ defer stream.Close()
+
podInfraContainer := pod.FindContainerByName(PodInfraContainerName)
if podInfraContainer == nil {
return noPodInfraContainerError(pod.Name, pod.Namespace)
@@ -1090,16 +1094,39 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
}
containerPid := container.State.Pid
- // TODO what if the host doesn't have it???
_, lookupErr := exec.LookPath("socat")
if lookupErr != nil {
- return fmt.Errorf("Unable to do port forwarding: socat not found.")
+ return fmt.Errorf("unable to do port forwarding: socat not found.")
}
+
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)}
- // TODO use exec.LookPath
+
+ _, lookupErr = exec.LookPath("nsenter")
+ if lookupErr != nil {
+ return fmt.Errorf("unable to do port forwarding: nsenter not found.")
+ }
+
command := exec.Command("nsenter", args...)
- command.Stdin = stream
command.Stdout = stream
+
+ // If we use Stdin, command.Run() won't return until the goroutine that's copying
+ // from stream finishes. Unfortunately, if you have a client like telnet connected
+ // via port forwarding, as long as the user's telnet client is connected to the user's
+ // local listener that port forwarding sets up, the telnet session never exits. This
+ // means that even if socat has finished running, command.Run() won't ever return
+ // (because the client still has the connection and stream open).
+ //
+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe
+ // when the command (socat) exits.
+ inPipe, err := command.StdinPipe()
+ if err != nil {
+ return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err)
+ }
+ go func() {
+ io.Copy(inPipe, stream)
+ inPipe.Close()
+ }()
+
return command.Run()
}
diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go
index 137988c..be2c50b 100644
--- a/pkg/kubelet/dockertools/manager_test.go
+++ b/pkg/kubelet/dockertools/manager_test.go
@@ -1926,6 +1926,12 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
}
}
+type fakeReadWriteCloser struct{}
+
+func (*fakeReadWriteCloser) Read([]byte) (int, error) { return 0, nil }
+func (*fakeReadWriteCloser) Write([]byte) (int, error) { return 0, nil }
+func (*fakeReadWriteCloser) Close() error { return nil }
+
func TestPortForwardNoSuchContainer(t *testing.T) {
dm, _ := newTestDockerManager()
@@ -1938,7 +1944,8 @@ func TestPortForwardNoSuchContainer(t *testing.T) {
Containers: nil,
},
5000,
- nil,
+ // need a valid io.ReadWriteCloser here
+ &fakeReadWriteCloser{},
)
if err == nil {
t.Fatal("unexpected non-error")
diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go
index 4092f38..5d9908e 100644
--- a/pkg/kubelet/rkt/rkt.go
+++ b/pkg/kubelet/rkt/rkt.go
@@ -1060,6 +1060,10 @@ func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) {
func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.")
+ // We need to close our half of the stream when socat exits so the client
+ // knows we aren't going to send any more data
+ defer stream.Close()
+
podInfos, err := r.getPodInfos()
if err != nil {
return err
@@ -1082,15 +1086,32 @@ func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: socat not found.")
}
+
args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)}
_, lookupErr = exec.LookPath("nsenter")
if lookupErr != nil {
return fmt.Errorf("unable to do port forwarding: nsenter not found.")
}
+
command := exec.Command("nsenter", args...)
- command.Stdin = stream
command.Stdout = stream
+
+ // If we use Stdin, command.Run() won't return until the goroutine that's copying
+ // from stream finishes. Unfortunately, if you have a client like telnet connected
+ // via port forwarding, as long as the user's telnet client is connected to the user's
+ // local listener that port forwarding sets up, the telnet session never exits. This
+ // means that even if socat has finished running, command.Run() won't ever return
+ // (because the client still has the connection and stream open).
+ //
+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe
+ // when the command (socat) exits.
+ inPipe, err := command.StdinPipe()
+ if err != nil {
+ return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err)
+ }
+ go io.Copy(inPipe, stream)
+
return command.Run()
}
diff --git a/test/e2e/portforward.go b/test/e2e/portforward.go
new file mode 100644
index 0000000..5cf4f75
--- /dev/null
+++ b/test/e2e/portforward.go
@@ -0,0 +1,180 @@
+/*
+Copyright 2015 The Kubernetes Authors All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package e2e
+
+import (
+ "fmt"
+ "net"
+ "os/exec"
+ "strconv"
+ "strings"
+
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/client"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+const (
+ portForwardTestImage = "gcr.io/google_containers/pftest"
+ podName = "pfpod"
+)
+
+func pfPod(expectedClientData, chunks, chunkSize, chunkInterval string) *api.Pod {
+ return &api.Pod{
+ ObjectMeta: api.ObjectMeta{
+ Name: podName,
+ Labels: map[string]string{"name": podName},
+ },
+ Spec: api.PodSpec{
+ Containers: []api.Container{
+ {
+ Name: "pf",
+ //Image: "gcr.io/google_containers/pftester",
+ Image: "ncdc/pftester:0.1",
+ Env: []api.EnvVar{
+ {
+ Name: "BIND_PORT",
+ Value: "80",
+ },
+ {
+ Name: "EXPECTED_CLIENT_DATA",
+ Value: expectedClientData,
+ },
+ {
+ Name: "CHUNKS",
+ Value: chunks,
+ },
+ {
+ Name: "CHUNK_SIZE",
+ Value: chunkSize,
+ },
+ {
+ Name: "CHUNK_INTERVAL",
+ Value: chunkInterval,
+ },
+ },
+ },
+ },
+ RestartPolicy: api.RestartPolicyNever,
+ },
+ }
+}
+
+func runPortForward(ns, podName string, port int) (*exec.Cmd, int) {
+ cmd := kubectlCmd("port-forward", fmt.Sprintf("--namespace=%v", ns), podName, fmt.Sprintf(":%d", port))
+ // This is somewhat ugly but is the only way to retrieve the port that was picked
+ // by the port-forward command. We don't want to hard code the port as we have no
+ // way of guaranteeing we can pick one that isn't in use, particularly on Jenkins.
+ Logf("starting port-forward command and streaming output")
+ stdout, stderr, err := startCmdAndStreamOutput(cmd)
+ if err != nil {
+ Failf("Failed to start port-forward command: %v", err)
+ }
+ defer stdout.Close()
+ defer stderr.Close()
+
+ buf := make([]byte, 128)
+ var n int
+ Logf("reading from `kubectl port-forward` command's stderr")
+ if n, err = stderr.Read(buf); err != nil {
+ Failf("Failed to read from kubectl port-forward stderr: %v", err)
+ }
+ portForwardOutput := string(buf[:n])
+ match := portForwardRegexp.FindStringSubmatch(portForwardOutput)
+ if len(match) != 2 {
+ Failf("Failed to parse kubectl port-forward output: %s", portForwardOutput)
+ }
+
+ listenPort, err := strconv.Atoi(match[1])
+ if err != nil {
+ Failf("Error converting %s to an int: %v", match[1], err)
+ }
+
+ return cmd, listenPort
+}
+
+var _ = Describe("Port forwarding", func() {
+ defer GinkgoRecover()
+
+ var (
+ c *client.Client
+ ns string
+ testingNs *api.Namespace
+ )
+
+ BeforeEach(func() {
+ var err error
+ c, err = loadClient()
+ expectNoError(err)
+ testingNs, err = createTestingNS("portforward", c)
+ Expect(err).NotTo(HaveOccurred())
+ ns = testingNs.Name
+ })
+
+ AfterEach(func() {
+ By(fmt.Sprintf("Destroying namespace for this suite %v", ns))
+ if err := deleteNS(c, ns); err != nil {
+ Failf("Couldn't delete ns %s", err)
+ }
+ })
+
+ Describe("Client connects, sends no data, disconnects", func() {
+ BeforeEach(func() {
+ By("creating the pod")
+ pod := pfPod("abc", "1", "1", "1")
+ c.Pods(ns).Create(pod)
+ checkPodsRunningReady(c, ns, []string{podName}, podStartTimeout)
+
+ })
+
+ AfterEach(func() {
+ c.Pods(ns).Delete(podName, api.NewDeleteOptions(0))
+ })
+
+ It("should exit cleanly", func() {
+ cmd, listenPort := runPortForward(ns, podName, 80)
+ defer tryKill(cmd)
+
+ conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort))
+ if err != nil {
+ Failf("Couldn't connect to port %d: %v", listenPort, err)
+ }
+ conn.Close()
+
+ logOutput := runKubectl("logs", fmt.Sprintf("--namespace=%v", ns), podName)
+ if !strings.Contains(logOutput, "Accepted client connection") {
+ Failf("Missing 'Accepted client connection' from log: %s", logOutput)
+ }
+
+ pod, err := c.Pods(ns).Get(podName)
+ if err != nil {
+ Failf("Error retrieving pod: %v", err)
+ }
+
+ state := pod.Status.ContainerStatuses[0].State
+ if state.Terminated == nil {
+ Failf("Expected pod to be terminated. Got %#v", state)
+ }
+
+ if e, a := 2, state.Terminated.ExitCode; e != a {
+ Failf("Expected container exit code %d, got %d", e, a)
+ }
+ })
+ })
+})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment