-
-
Save ncdc/757b6504bab026e71bdf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 7f0637c4b5703b8d2df0ac535f69addb5703820f | |
Author: Andy Goldstein <agoldste@redhat.com> | |
Date: Wed Aug 5 12:46:11 2015 -0400 | |
Correct port-forward data copying logic | |
Correct port-forward data copying logic so that the server closes its | |
half of the data stream when socat exits, and the client closes its half | |
of the data stream when it finishes writing. | |
Modify the client to wait for both copies (client->server, | |
server->client) to finish before it unblocks. | |
diff --git a/contrib/for-tests/port-forward-tester/.gitignore b/contrib/for-tests/port-forward-tester/.gitignore | |
new file mode 100644 | |
index 0000000..eb23f7e | |
--- /dev/null | |
+++ b/contrib/for-tests/port-forward-tester/.gitignore | |
@@ -0,0 +1 @@ | |
+pftester | |
diff --git a/contrib/for-tests/port-forward-tester/Dockerfile b/contrib/for-tests/port-forward-tester/Dockerfile | |
new file mode 100644 | |
index 0000000..239cf57 | |
--- /dev/null | |
+++ b/contrib/for-tests/port-forward-tester/Dockerfile | |
@@ -0,0 +1,17 @@ | |
+# Copyright 2015 The Kubernetes Authors All rights reserved. | |
+# | |
+# Licensed under the Apache License, Version 2.0 (the "License"); | |
+# you may not use this file except in compliance with the License. | |
+# You may obtain a copy of the License at | |
+# | |
+# http://www.apache.org/licenses/LICENSE-2.0 | |
+# | |
+# Unless required by applicable law or agreed to in writing, software | |
+# distributed under the License is distributed on an "AS IS" BASIS, | |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+# See the License for the specific language governing permissions and | |
+# limitations under the License. | |
+ | |
+FROM scratch | |
+ADD pftester pftester | |
+ENTRYPOINT ["/pftester"] | |
diff --git a/contrib/for-tests/port-forward-tester/Makefile b/contrib/for-tests/port-forward-tester/Makefile | |
new file mode 100644 | |
index 0000000..aa479d5 | |
--- /dev/null | |
+++ b/contrib/for-tests/port-forward-tester/Makefile | |
@@ -0,0 +1,15 @@ | |
+all: push | |
+ | |
+TAG = 0.1 | |
+ | |
+pftester: pftester.go | |
+ CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-w' ./pftester.go | |
+ | |
+image: pftester | |
+ sudo docker build -t ncdc/pftester:$(TAG) . | |
+ | |
+push: image | |
+ sudo docker push ncdc/pftester:$(TAG) | |
+ | |
+clean: | |
+ rm -f pftester | |
diff --git a/contrib/for-tests/port-forward-tester/pftester.go b/contrib/for-tests/port-forward-tester/pftester.go | |
new file mode 100644 | |
index 0000000..ef8be40 | |
--- /dev/null | |
+++ b/contrib/for-tests/port-forward-tester/pftester.go | |
@@ -0,0 +1,93 @@ | |
+/* | |
+Copyright 2015 The Kubernetes Authors All rights reserved. | |
+ | |
+Licensed under the Apache License, Version 2.0 (the "License"); | |
+you may not use this file except in compliance with the License. | |
+You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+Unless required by applicable law or agreed to in writing, software | |
+distributed under the License is distributed on an "AS IS" BASIS, | |
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+See the License for the specific language governing permissions and | |
+limitations under the License. | |
+*/ | |
+ | |
+package main | |
+ | |
+import ( | |
+ "fmt" | |
+ "net" | |
+ "os" | |
+ "strconv" | |
+ "strings" | |
+ "time" | |
+) | |
+ | |
+func getEnvInt(name string) int { | |
+ s := os.Getenv(name) | |
+ value, err := strconv.Atoi(s) | |
+ if err != nil { | |
+ fmt.Printf("Error parsing %s %q: %v\n", name, s, err) | |
+ os.Exit(1) | |
+ } | |
+ return value | |
+} | |
+ | |
+func main() { | |
+ bindPort := os.Getenv("BIND_PORT") | |
+ listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", bindPort)) | |
+ if err != nil { | |
+ fmt.Printf("Error listening: %v\n", err) | |
+ os.Exit(1) | |
+ } | |
+ | |
+ conn, err := listener.Accept() | |
+ if err != nil { | |
+ fmt.Printf("Error accepting connection: %v\n", err) | |
+ os.Exit(1) | |
+ } | |
+ defer conn.Close() | |
+ fmt.Println("Accepted client connection") | |
+ | |
+ expectedClientData := os.Getenv("EXPECTED_CLIENT_DATA") | |
+ if len(expectedClientData) > 0 { | |
+ buf := make([]byte, len(expectedClientData)) | |
+ read, err := conn.Read(buf) | |
+ if read != len(expectedClientData) { | |
+ fmt.Printf("Expected to read %d bytes from client, but got %d instead. err=%v\n", len(expectedClientData), read, err) | |
+ os.Exit(2) | |
+ } | |
+ if expectedClientData != string(buf) { | |
+ fmt.Printf("Expect to read %q, but got %q. err=%v\n", expectedClientData, string(buf), err) | |
+ os.Exit(3) | |
+ } | |
+ if err != nil { | |
+ fmt.Printf("Read err: %v\n", err) | |
+ } | |
+ } | |
+ | |
+ chunks := getEnvInt("CHUNKS") | |
+ chunkSize := getEnvInt("CHUNK_SIZE") | |
+ chunkInterval := getEnvInt("CHUNK_INTERVAL") | |
+ | |
+ stringData := strings.Repeat("x", chunkSize) | |
+ data := []byte(stringData) | |
+ | |
+ for i := 0; i < chunks; i++ { | |
+ written, err := conn.Write(data) | |
+ if written != chunkSize { | |
+ fmt.Printf("Expected to write %d bytes from client, but wrote %d instead. err=%v\n", chunkSize, written, err) | |
+ os.Exit(4) | |
+ } | |
+ if err != nil { | |
+ fmt.Printf("Write err: %v\n", err) | |
+ } | |
+ if i+1 < chunks { | |
+ time.Sleep(time.Duration(chunkInterval) * time.Second) | |
+ } | |
+ } | |
+ | |
+ fmt.Println("Done") | |
+} | |
diff --git a/pkg/client/portforward/portforward.go b/pkg/client/portforward/portforward.go | |
index a7dfe15..6efd163 100644 | |
--- a/pkg/client/portforward/portforward.go | |
+++ b/pkg/client/portforward/portforward.go | |
@@ -285,26 +285,38 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { | |
defer dataStream.Reset() | |
go func() { | |
- // Copy from the remote side to the local port. We won't get an EOF from | |
- // the server as it has no way of knowing when to close the stream. We'll | |
- // take care of closing both ends of the stream with the call to | |
- // stream.Reset() when this function exits. | |
- if _, err := io.Copy(conn, dataStream); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { | |
+ // Copy from the remote side to the local port. | |
+ _, err := io.Copy(conn, dataStream) | |
+ if err != nil { | |
glog.Errorf("Error copying from remote stream to local connection: %v", err) | |
} | |
+ | |
+ // inform the select below that the remote copy is done | |
doneChan <- struct{}{} | |
}() | |
go func() { | |
- // Copy from the local port to the remote side. Here we will be able to know | |
- // when the Copy gets an EOF from conn, as that will happen as soon as conn is | |
- // closed (i.e. client disconnected). | |
- if _, err := io.Copy(dataStream, conn); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { | |
- glog.Errorf("Error copying from local connection to remote stream: %v", err) | |
+ // Copy from the local port to the remote side. | |
+ _, err := io.Copy(dataStream, conn) | |
+ | |
+ // inform server we're not sending any more data | |
+ dataStream.Close() | |
+ | |
+ if err == nil { | |
+ // clean copy - wait for server to finish | |
+ return | |
} | |
+ | |
+ glog.Errorf("Error copying from local connection to remote stream: %v", err) | |
+ | |
+ // shutdown the stream in both directions | |
+ dataStream.Reset() | |
+ | |
+ // break out of the select below | |
doneChan <- struct{}{} | |
}() | |
+ // wait for either an error or for copying from the remote side to finish | |
select { | |
case err := <-errorChan: | |
glog.Error(err) | |
diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go | |
index b845191..06eb2e8 100644 | |
--- a/pkg/kubelet/dockertools/manager.go | |
+++ b/pkg/kubelet/dockertools/manager.go | |
@@ -1076,6 +1076,10 @@ func noPodInfraContainerError(podName, podNamespace string) error { | |
// - should we support nsenter + socat on the host? (current impl) | |
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host? | |
func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { | |
+ // We need to close our half of the stream when socat exits so the client | |
+ // knows we aren't going to send any more data | |
+ defer stream.Close() | |
+ | |
podInfraContainer := pod.FindContainerByName(PodInfraContainerName) | |
if podInfraContainer == nil { | |
return noPodInfraContainerError(pod.Name, pod.Namespace) | |
@@ -1090,16 +1094,39 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream | |
} | |
containerPid := container.State.Pid | |
- // TODO what if the host doesn't have it??? | |
_, lookupErr := exec.LookPath("socat") | |
if lookupErr != nil { | |
- return fmt.Errorf("Unable to do port forwarding: socat not found.") | |
+ return fmt.Errorf("unable to do port forwarding: socat not found.") | |
} | |
+ | |
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)} | |
- // TODO use exec.LookPath | |
+ | |
+ _, lookupErr = exec.LookPath("nsenter") | |
+ if lookupErr != nil { | |
+ return fmt.Errorf("unable to do port forwarding: nsenter not found.") | |
+ } | |
+ | |
command := exec.Command("nsenter", args...) | |
- command.Stdin = stream | |
command.Stdout = stream | |
+ | |
+ // If we use Stdin, command.Run() won't return until the goroutine that's copying | |
+ // from stream finishes. Unfortunately, if you have a client like telnet connected | |
+ // via port forwarding, as long as the user's telnet client is connected to the user's | |
+ // local listener that port forwarding sets up, the telnet session never exits. This | |
+ // means that even if socat has finished running, command.Run() won't ever return | |
+ // (because the client still has the connection and stream open). | |
+ // | |
+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe | |
+ // when the command (socat) exits. | |
+ inPipe, err := command.StdinPipe() | |
+ if err != nil { | |
+ return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err) | |
+ } | |
+ go func() { | |
+ io.Copy(inPipe, stream) | |
+ inPipe.Close() | |
+ }() | |
+ | |
return command.Run() | |
} | |
diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go | |
index 137988c..be2c50b 100644 | |
--- a/pkg/kubelet/dockertools/manager_test.go | |
+++ b/pkg/kubelet/dockertools/manager_test.go | |
@@ -1926,6 +1926,12 @@ func TestSyncPodEventHandlerFails(t *testing.T) { | |
} | |
} | |
+type fakeReadWriteCloser struct{} | |
+ | |
+func (*fakeReadWriteCloser) Read([]byte) (int, error) { return 0, nil } | |
+func (*fakeReadWriteCloser) Write([]byte) (int, error) { return 0, nil } | |
+func (*fakeReadWriteCloser) Close() error { return nil } | |
+ | |
func TestPortForwardNoSuchContainer(t *testing.T) { | |
dm, _ := newTestDockerManager() | |
@@ -1938,7 +1944,8 @@ func TestPortForwardNoSuchContainer(t *testing.T) { | |
Containers: nil, | |
}, | |
5000, | |
- nil, | |
+ // need a valid io.ReadWriteCloser here | |
+ &fakeReadWriteCloser{}, | |
) | |
if err == nil { | |
t.Fatal("unexpected non-error") | |
diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go | |
index 4092f38..5d9908e 100644 | |
--- a/pkg/kubelet/rkt/rkt.go | |
+++ b/pkg/kubelet/rkt/rkt.go | |
@@ -1060,6 +1060,10 @@ func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) { | |
func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { | |
glog.V(4).Infof("Rkt port forwarding in container.") | |
+ // We need to close our half of the stream when socat exits so the client | |
+ // knows we aren't going to send any more data | |
+ defer stream.Close() | |
+ | |
podInfos, err := r.getPodInfos() | |
if err != nil { | |
return err | |
@@ -1082,15 +1086,32 @@ func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea | |
if lookupErr != nil { | |
return fmt.Errorf("unable to do port forwarding: socat not found.") | |
} | |
+ | |
args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)} | |
_, lookupErr = exec.LookPath("nsenter") | |
if lookupErr != nil { | |
return fmt.Errorf("unable to do port forwarding: nsenter not found.") | |
} | |
+ | |
command := exec.Command("nsenter", args...) | |
- command.Stdin = stream | |
command.Stdout = stream | |
+ | |
+ // If we use Stdin, command.Run() won't return until the goroutine that's copying | |
+ // from stream finishes. Unfortunately, if you have a client like telnet connected | |
+ // via port forwarding, as long as the user's telnet client is connected to the user's | |
+ // local listener that port forwarding sets up, the telnet session never exits. This | |
+ // means that even if socat has finished running, command.Run() won't ever return | |
+ // (because the client still has the connection and stream open). | |
+ // | |
+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe | |
+ // when the command (socat) exits. | |
+ inPipe, err := command.StdinPipe() | |
+ if err != nil { | |
+ return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err) | |
+ } | |
+ go io.Copy(inPipe, stream) | |
+ | |
return command.Run() | |
} | |
diff --git a/test/e2e/portforward.go b/test/e2e/portforward.go | |
new file mode 100644 | |
index 0000000..5cf4f75 | |
--- /dev/null | |
+++ b/test/e2e/portforward.go | |
@@ -0,0 +1,180 @@ | |
+/* | |
+Copyright 2015 The Kubernetes Authors All rights reserved. | |
+ | |
+Licensed under the Apache License, Version 2.0 (the "License"); | |
+you may not use this file except in compliance with the License. | |
+You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+Unless required by applicable law or agreed to in writing, software | |
+distributed under the License is distributed on an "AS IS" BASIS, | |
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+See the License for the specific language governing permissions and | |
+limitations under the License. | |
+*/ | |
+ | |
+package e2e | |
+ | |
+import ( | |
+ "fmt" | |
+ "net" | |
+ "os/exec" | |
+ "strconv" | |
+ "strings" | |
+ | |
+ "k8s.io/kubernetes/pkg/api" | |
+ "k8s.io/kubernetes/pkg/client" | |
+ | |
+ . "github.com/onsi/ginkgo" | |
+ . "github.com/onsi/gomega" | |
+) | |
+ | |
+const ( | |
+ portForwardTestImage = "gcr.io/google_containers/pftest" | |
+ podName = "pfpod" | |
+) | |
+ | |
+func pfPod(expectedClientData, chunks, chunkSize, chunkInterval string) *api.Pod { | |
+ return &api.Pod{ | |
+ ObjectMeta: api.ObjectMeta{ | |
+ Name: podName, | |
+ Labels: map[string]string{"name": podName}, | |
+ }, | |
+ Spec: api.PodSpec{ | |
+ Containers: []api.Container{ | |
+ { | |
+ Name: "pf", | |
+ //Image: "gcr.io/google_containers/pftester", | |
+ Image: "ncdc/pftester:0.1", | |
+ Env: []api.EnvVar{ | |
+ { | |
+ Name: "BIND_PORT", | |
+ Value: "80", | |
+ }, | |
+ { | |
+ Name: "EXPECTED_CLIENT_DATA", | |
+ Value: expectedClientData, | |
+ }, | |
+ { | |
+ Name: "CHUNKS", | |
+ Value: chunks, | |
+ }, | |
+ { | |
+ Name: "CHUNK_SIZE", | |
+ Value: chunkSize, | |
+ }, | |
+ { | |
+ Name: "CHUNK_INTERVAL", | |
+ Value: chunkInterval, | |
+ }, | |
+ }, | |
+ }, | |
+ }, | |
+ RestartPolicy: api.RestartPolicyNever, | |
+ }, | |
+ } | |
+} | |
+ | |
+func runPortForward(ns, podName string, port int) (*exec.Cmd, int) { | |
+ cmd := kubectlCmd("port-forward", fmt.Sprintf("--namespace=%v", ns), podName, fmt.Sprintf(":%d", port)) | |
+ // This is somewhat ugly but is the only way to retrieve the port that was picked | |
+ // by the port-forward command. We don't want to hard code the port as we have no | |
+ // way of guaranteeing we can pick one that isn't in use, particularly on Jenkins. | |
+ Logf("starting port-forward command and streaming output") | |
+ stdout, stderr, err := startCmdAndStreamOutput(cmd) | |
+ if err != nil { | |
+ Failf("Failed to start port-forward command: %v", err) | |
+ } | |
+ defer stdout.Close() | |
+ defer stderr.Close() | |
+ | |
+ buf := make([]byte, 128) | |
+ var n int | |
+ Logf("reading from `kubectl port-forward` command's stderr") | |
+ if n, err = stderr.Read(buf); err != nil { | |
+ Failf("Failed to read from kubectl port-forward stderr: %v", err) | |
+ } | |
+ portForwardOutput := string(buf[:n]) | |
+ match := portForwardRegexp.FindStringSubmatch(portForwardOutput) | |
+ if len(match) != 2 { | |
+ Failf("Failed to parse kubectl port-forward output: %s", portForwardOutput) | |
+ } | |
+ | |
+ listenPort, err := strconv.Atoi(match[1]) | |
+ if err != nil { | |
+ Failf("Error converting %s to an int: %v", match[1], err) | |
+ } | |
+ | |
+ return cmd, listenPort | |
+} | |
+ | |
+var _ = Describe("Port forwarding", func() { | |
+ defer GinkgoRecover() | |
+ | |
+ var ( | |
+ c *client.Client | |
+ ns string | |
+ testingNs *api.Namespace | |
+ ) | |
+ | |
+ BeforeEach(func() { | |
+ var err error | |
+ c, err = loadClient() | |
+ expectNoError(err) | |
+ testingNs, err = createTestingNS("portforward", c) | |
+ Expect(err).NotTo(HaveOccurred()) | |
+ ns = testingNs.Name | |
+ }) | |
+ | |
+ AfterEach(func() { | |
+ By(fmt.Sprintf("Destroying namespace for this suite %v", ns)) | |
+ if err := deleteNS(c, ns); err != nil { | |
+ Failf("Couldn't delete ns %s", err) | |
+ } | |
+ }) | |
+ | |
+ Describe("Client connects, sends no data, disconnects", func() { | |
+ BeforeEach(func() { | |
+ By("creating the pod") | |
+ pod := pfPod("abc", "1", "1", "1") | |
+ c.Pods(ns).Create(pod) | |
+ checkPodsRunningReady(c, ns, []string{podName}, podStartTimeout) | |
+ | |
+ }) | |
+ | |
+ AfterEach(func() { | |
+ c.Pods(ns).Delete(podName, api.NewDeleteOptions(0)) | |
+ }) | |
+ | |
+ It("should exit cleanly", func() { | |
+ cmd, listenPort := runPortForward(ns, podName, 80) | |
+ defer tryKill(cmd) | |
+ | |
+ conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort)) | |
+ if err != nil { | |
+ Failf("Couldn't connect to port %d: %v", listenPort, err) | |
+ } | |
+ conn.Close() | |
+ | |
+ logOutput := runKubectl("logs", fmt.Sprintf("--namespace=%v", ns), podName) | |
+ if !strings.Contains(logOutput, "Accepted client connection") { | |
+ Failf("Missing 'Accepted client connection' from log: %s", logOutput) | |
+ } | |
+ | |
+ pod, err := c.Pods(ns).Get(podName) | |
+ if err != nil { | |
+ Failf("Error retrieving pod: %v", err) | |
+ } | |
+ | |
+ state := pod.Status.ContainerStatuses[0].State | |
+ if state.Terminated == nil { | |
+ Failf("Expected pod to be terminated. Got %#v", state) | |
+ } | |
+ | |
+ if e, a := 2, state.Terminated.ExitCode; e != a { | |
+ Failf("Expected container exit code %d, got %d", e, a) | |
+ } | |
+ }) | |
+ }) | |
+}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 7f0637c4b5703b8d2df0ac535f69addb5703820f | |
Author: Andy Goldstein <agoldste@redhat.com> | |
Date: Wed Aug 5 12:46:11 2015 -0400 | |
Correct port-forward data copying logic | |
Correct port-forward data copying logic so that the server closes its | |
half of the data stream when socat exits, and the client closes its half | |
of the data stream when it finishes writing. | |
Modify the client to wait for both copies (client->server, | |
server->client) to finish before it unblocks. | |
diff --git a/contrib/for-tests/port-forward-tester/.gitignore b/contrib/for-tests/port-forward-tester/.gitignore | |
new file mode 100644 | |
index 0000000..eb23f7e | |
--- /dev/null | |
+++ b/contrib/for-tests/port-forward-tester/.gitignore | |
@@ -0,0 +1 @@ | |
+pftester | |
diff --git a/contrib/for-tests/port-forward-tester/Dockerfile b/contrib/for-tests/port-forward-tester/Dockerfile | |
new file mode 100644 | |
index 0000000..239cf57 | |
--- /dev/null | |
+++ b/contrib/for-tests/port-forward-tester/Dockerfile | |
@@ -0,0 +1,17 @@ | |
+# Copyright 2015 The Kubernetes Authors All rights reserved. | |
+# | |
+# Licensed under the Apache License, Version 2.0 (the "License"); | |
+# you may not use this file except in compliance with the License. | |
+# You may obtain a copy of the License at | |
+# | |
+# http://www.apache.org/licenses/LICENSE-2.0 | |
+# | |
+# Unless required by applicable law or agreed to in writing, software | |
+# distributed under the License is distributed on an "AS IS" BASIS, | |
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+# See the License for the specific language governing permissions and | |
+# limitations under the License. | |
+ | |
+FROM scratch | |
+ADD pftester pftester | |
+ENTRYPOINT ["/pftester"] | |
diff --git a/contrib/for-tests/port-forward-tester/Makefile b/contrib/for-tests/port-forward-tester/Makefile | |
new file mode 100644 | |
index 0000000..aa479d5 | |
--- /dev/null | |
+++ b/contrib/for-tests/port-forward-tester/Makefile | |
@@ -0,0 +1,15 @@ | |
+all: push | |
+ | |
+TAG = 0.1 | |
+ | |
+pftester: pftester.go | |
+ CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-w' ./pftester.go | |
+ | |
+image: pftester | |
+ sudo docker build -t ncdc/pftester:$(TAG) . | |
+ | |
+push: image | |
+ sudo docker push ncdc/pftester:$(TAG) | |
+ | |
+clean: | |
+ rm -f pftester | |
diff --git a/contrib/for-tests/port-forward-tester/pftester.go b/contrib/for-tests/port-forward-tester/pftester.go | |
new file mode 100644 | |
index 0000000..ef8be40 | |
--- /dev/null | |
+++ b/contrib/for-tests/port-forward-tester/pftester.go | |
@@ -0,0 +1,93 @@ | |
+/* | |
+Copyright 2015 The Kubernetes Authors All rights reserved. | |
+ | |
+Licensed under the Apache License, Version 2.0 (the "License"); | |
+you may not use this file except in compliance with the License. | |
+You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+Unless required by applicable law or agreed to in writing, software | |
+distributed under the License is distributed on an "AS IS" BASIS, | |
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+See the License for the specific language governing permissions and | |
+limitations under the License. | |
+*/ | |
+ | |
+package main | |
+ | |
+import ( | |
+ "fmt" | |
+ "net" | |
+ "os" | |
+ "strconv" | |
+ "strings" | |
+ "time" | |
+) | |
+ | |
+func getEnvInt(name string) int { | |
+ s := os.Getenv(name) | |
+ value, err := strconv.Atoi(s) | |
+ if err != nil { | |
+ fmt.Printf("Error parsing %s %q: %v\n", name, s, err) | |
+ os.Exit(1) | |
+ } | |
+ return value | |
+} | |
+ | |
+func main() { | |
+ bindPort := os.Getenv("BIND_PORT") | |
+ listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", bindPort)) | |
+ if err != nil { | |
+ fmt.Printf("Error listening: %v\n", err) | |
+ os.Exit(1) | |
+ } | |
+ | |
+ conn, err := listener.Accept() | |
+ if err != nil { | |
+ fmt.Printf("Error accepting connection: %v\n", err) | |
+ os.Exit(1) | |
+ } | |
+ defer conn.Close() | |
+ fmt.Println("Accepted client connection") | |
+ | |
+ expectedClientData := os.Getenv("EXPECTED_CLIENT_DATA") | |
+ if len(expectedClientData) > 0 { | |
+ buf := make([]byte, len(expectedClientData)) | |
+ read, err := conn.Read(buf) | |
+ if read != len(expectedClientData) { | |
+ fmt.Printf("Expected to read %d bytes from client, but got %d instead. err=%v\n", len(expectedClientData), read, err) | |
+ os.Exit(2) | |
+ } | |
+ if expectedClientData != string(buf) { | |
+ fmt.Printf("Expect to read %q, but got %q. err=%v\n", expectedClientData, string(buf), err) | |
+ os.Exit(3) | |
+ } | |
+ if err != nil { | |
+ fmt.Printf("Read err: %v\n", err) | |
+ } | |
+ } | |
+ | |
+ chunks := getEnvInt("CHUNKS") | |
+ chunkSize := getEnvInt("CHUNK_SIZE") | |
+ chunkInterval := getEnvInt("CHUNK_INTERVAL") | |
+ | |
+ stringData := strings.Repeat("x", chunkSize) | |
+ data := []byte(stringData) | |
+ | |
+ for i := 0; i < chunks; i++ { | |
+ written, err := conn.Write(data) | |
+ if written != chunkSize { | |
+ fmt.Printf("Expected to write %d bytes from client, but wrote %d instead. err=%v\n", chunkSize, written, err) | |
+ os.Exit(4) | |
+ } | |
+ if err != nil { | |
+ fmt.Printf("Write err: %v\n", err) | |
+ } | |
+ if i+1 < chunks { | |
+ time.Sleep(time.Duration(chunkInterval) * time.Second) | |
+ } | |
+ } | |
+ | |
+ fmt.Println("Done") | |
+} | |
diff --git a/pkg/client/portforward/portforward.go b/pkg/client/portforward/portforward.go | |
index a7dfe15..6efd163 100644 | |
--- a/pkg/client/portforward/portforward.go | |
+++ b/pkg/client/portforward/portforward.go | |
@@ -285,26 +285,38 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { | |
defer dataStream.Reset() | |
go func() { | |
- // Copy from the remote side to the local port. We won't get an EOF from | |
- // the server as it has no way of knowing when to close the stream. We'll | |
- // take care of closing both ends of the stream with the call to | |
- // stream.Reset() when this function exits. | |
- if _, err := io.Copy(conn, dataStream); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { | |
+ // Copy from the remote side to the local port. | |
+ _, err := io.Copy(conn, dataStream) | |
+ if err != nil { | |
glog.Errorf("Error copying from remote stream to local connection: %v", err) | |
} | |
+ | |
+ // inform the select below that the remote copy is done | |
doneChan <- struct{}{} | |
}() | |
go func() { | |
- // Copy from the local port to the remote side. Here we will be able to know | |
- // when the Copy gets an EOF from conn, as that will happen as soon as conn is | |
- // closed (i.e. client disconnected). | |
- if _, err := io.Copy(dataStream, conn); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") { | |
- glog.Errorf("Error copying from local connection to remote stream: %v", err) | |
+ // Copy from the local port to the remote side. | |
+ _, err := io.Copy(dataStream, conn) | |
+ | |
+ // inform server we're not sending any more data | |
+ dataStream.Close() | |
+ | |
+ if err == nil { | |
+ // clean copy - wait for server to finish | |
+ return | |
} | |
+ | |
+ glog.Errorf("Error copying from local connection to remote stream: %v", err) | |
+ | |
+ // shutdown the stream in both directions | |
+ dataStream.Reset() | |
+ | |
+ // break out of the select below | |
doneChan <- struct{}{} | |
}() | |
+ // wait for either an error or for copying from the remote side to finish | |
select { | |
case err := <-errorChan: | |
glog.Error(err) | |
diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go | |
index b845191..06eb2e8 100644 | |
--- a/pkg/kubelet/dockertools/manager.go | |
+++ b/pkg/kubelet/dockertools/manager.go | |
@@ -1076,6 +1076,10 @@ func noPodInfraContainerError(podName, podNamespace string) error { | |
// - should we support nsenter + socat on the host? (current impl) | |
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host? | |
func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { | |
+ // We need to close our half of the stream when socat exits so the client | |
+ // knows we aren't going to send any more data | |
+ defer stream.Close() | |
+ | |
podInfraContainer := pod.FindContainerByName(PodInfraContainerName) | |
if podInfraContainer == nil { | |
return noPodInfraContainerError(pod.Name, pod.Namespace) | |
@@ -1090,16 +1094,39 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream | |
} | |
containerPid := container.State.Pid | |
- // TODO what if the host doesn't have it??? | |
_, lookupErr := exec.LookPath("socat") | |
if lookupErr != nil { | |
- return fmt.Errorf("Unable to do port forwarding: socat not found.") | |
+ return fmt.Errorf("unable to do port forwarding: socat not found.") | |
} | |
+ | |
args := []string{"-t", fmt.Sprintf("%d", containerPid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)} | |
- // TODO use exec.LookPath | |
+ | |
+ _, lookupErr = exec.LookPath("nsenter") | |
+ if lookupErr != nil { | |
+ return fmt.Errorf("unable to do port forwarding: nsenter not found.") | |
+ } | |
+ | |
command := exec.Command("nsenter", args...) | |
- command.Stdin = stream | |
command.Stdout = stream | |
+ | |
+ // If we use Stdin, command.Run() won't return until the goroutine that's copying | |
+ // from stream finishes. Unfortunately, if you have a client like telnet connected | |
+ // via port forwarding, as long as the user's telnet client is connected to the user's | |
+ // local listener that port forwarding sets up, the telnet session never exits. This | |
+ // means that even if socat has finished running, command.Run() won't ever return | |
+ // (because the client still has the connection and stream open). | |
+ // | |
+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe | |
+ // when the command (socat) exits. | |
+ inPipe, err := command.StdinPipe() | |
+ if err != nil { | |
+ return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err) | |
+ } | |
+ go func() { | |
+ io.Copy(inPipe, stream) | |
+ inPipe.Close() | |
+ }() | |
+ | |
return command.Run() | |
} | |
diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go | |
index 137988c..be2c50b 100644 | |
--- a/pkg/kubelet/dockertools/manager_test.go | |
+++ b/pkg/kubelet/dockertools/manager_test.go | |
@@ -1926,6 +1926,12 @@ func TestSyncPodEventHandlerFails(t *testing.T) { | |
} | |
} | |
+type fakeReadWriteCloser struct{} | |
+ | |
+func (*fakeReadWriteCloser) Read([]byte) (int, error) { return 0, nil } | |
+func (*fakeReadWriteCloser) Write([]byte) (int, error) { return 0, nil } | |
+func (*fakeReadWriteCloser) Close() error { return nil } | |
+ | |
func TestPortForwardNoSuchContainer(t *testing.T) { | |
dm, _ := newTestDockerManager() | |
@@ -1938,7 +1944,8 @@ func TestPortForwardNoSuchContainer(t *testing.T) { | |
Containers: nil, | |
}, | |
5000, | |
- nil, | |
+ // need a valid io.ReadWriteCloser here | |
+ &fakeReadWriteCloser{}, | |
) | |
if err == nil { | |
t.Fatal("unexpected non-error") | |
diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go | |
index 4092f38..5d9908e 100644 | |
--- a/pkg/kubelet/rkt/rkt.go | |
+++ b/pkg/kubelet/rkt/rkt.go | |
@@ -1060,6 +1060,10 @@ func (r *runtime) findRktID(pod *kubecontainer.Pod) (string, error) { | |
func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { | |
glog.V(4).Infof("Rkt port forwarding in container.") | |
+ // We need to close our half of the stream when socat exits so the client | |
+ // knows we aren't going to send any more data | |
+ defer stream.Close() | |
+ | |
podInfos, err := r.getPodInfos() | |
if err != nil { | |
return err | |
@@ -1082,15 +1086,32 @@ func (r *runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea | |
if lookupErr != nil { | |
return fmt.Errorf("unable to do port forwarding: socat not found.") | |
} | |
+ | |
args := []string{"-t", fmt.Sprintf("%d", info.pid), "-n", "socat", "-", fmt.Sprintf("TCP4:localhost:%d", port)} | |
_, lookupErr = exec.LookPath("nsenter") | |
if lookupErr != nil { | |
return fmt.Errorf("unable to do port forwarding: nsenter not found.") | |
} | |
+ | |
command := exec.Command("nsenter", args...) | |
- command.Stdin = stream | |
command.Stdout = stream | |
+ | |
+ // If we use Stdin, command.Run() won't return until the goroutine that's copying | |
+ // from stream finishes. Unfortunately, if you have a client like telnet connected | |
+ // via port forwarding, as long as the user's telnet client is connected to the user's | |
+ // local listener that port forwarding sets up, the telnet session never exits. This | |
+ // means that even if socat has finished running, command.Run() won't ever return | |
+ // (because the client still has the connection and stream open). | |
+ // | |
+ // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe | |
+ // when the command (socat) exits. | |
+ inPipe, err := command.StdinPipe() | |
+ if err != nil { | |
+ return fmt.Errorf("unable to do port forwarding: error creating stdin pipe: %v", err) | |
+ } | |
+ go io.Copy(inPipe, stream) | |
+ | |
return command.Run() | |
} | |
diff --git a/test/e2e/portforward.go b/test/e2e/portforward.go | |
new file mode 100644 | |
index 0000000..5cf4f75 | |
--- /dev/null | |
+++ b/test/e2e/portforward.go | |
@@ -0,0 +1,180 @@ | |
+/* | |
+Copyright 2015 The Kubernetes Authors All rights reserved. | |
+ | |
+Licensed under the Apache License, Version 2.0 (the "License"); | |
+you may not use this file except in compliance with the License. | |
+You may obtain a copy of the License at | |
+ | |
+ http://www.apache.org/licenses/LICENSE-2.0 | |
+ | |
+Unless required by applicable law or agreed to in writing, software | |
+distributed under the License is distributed on an "AS IS" BASIS, | |
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+See the License for the specific language governing permissions and | |
+limitations under the License. | |
+*/ | |
+ | |
+package e2e | |
+ | |
+import ( | |
+ "fmt" | |
+ "net" | |
+ "os/exec" | |
+ "strconv" | |
+ "strings" | |
+ | |
+ "k8s.io/kubernetes/pkg/api" | |
+ "k8s.io/kubernetes/pkg/client" | |
+ | |
+ . "github.com/onsi/ginkgo" | |
+ . "github.com/onsi/gomega" | |
+) | |
+ | |
+const ( | |
+ portForwardTestImage = "gcr.io/google_containers/pftest" | |
+ podName = "pfpod" | |
+) | |
+ | |
+func pfPod(expectedClientData, chunks, chunkSize, chunkInterval string) *api.Pod { | |
+ return &api.Pod{ | |
+ ObjectMeta: api.ObjectMeta{ | |
+ Name: podName, | |
+ Labels: map[string]string{"name": podName}, | |
+ }, | |
+ Spec: api.PodSpec{ | |
+ Containers: []api.Container{ | |
+ { | |
+ Name: "pf", | |
+ //Image: "gcr.io/google_containers/pftester", | |
+ Image: "ncdc/pftester:0.1", | |
+ Env: []api.EnvVar{ | |
+ { | |
+ Name: "BIND_PORT", | |
+ Value: "80", | |
+ }, | |
+ { | |
+ Name: "EXPECTED_CLIENT_DATA", | |
+ Value: expectedClientData, | |
+ }, | |
+ { | |
+ Name: "CHUNKS", | |
+ Value: chunks, | |
+ }, | |
+ { | |
+ Name: "CHUNK_SIZE", | |
+ Value: chunkSize, | |
+ }, | |
+ { | |
+ Name: "CHUNK_INTERVAL", | |
+ Value: chunkInterval, | |
+ }, | |
+ }, | |
+ }, | |
+ }, | |
+ RestartPolicy: api.RestartPolicyNever, | |
+ }, | |
+ } | |
+} | |
+ | |
+func runPortForward(ns, podName string, port int) (*exec.Cmd, int) { | |
+ cmd := kubectlCmd("port-forward", fmt.Sprintf("--namespace=%v", ns), podName, fmt.Sprintf(":%d", port)) | |
+ // This is somewhat ugly but is the only way to retrieve the port that was picked | |
+ // by the port-forward command. We don't want to hard code the port as we have no | |
+ // way of guaranteeing we can pick one that isn't in use, particularly on Jenkins. | |
+ Logf("starting port-forward command and streaming output") | |
+ stdout, stderr, err := startCmdAndStreamOutput(cmd) | |
+ if err != nil { | |
+ Failf("Failed to start port-forward command: %v", err) | |
+ } | |
+ defer stdout.Close() | |
+ defer stderr.Close() | |
+ | |
+ buf := make([]byte, 128) | |
+ var n int | |
+ Logf("reading from `kubectl port-forward` command's stderr") | |
+ if n, err = stderr.Read(buf); err != nil { | |
+ Failf("Failed to read from kubectl port-forward stderr: %v", err) | |
+ } | |
+ portForwardOutput := string(buf[:n]) | |
+ match := portForwardRegexp.FindStringSubmatch(portForwardOutput) | |
+ if len(match) != 2 { | |
+ Failf("Failed to parse kubectl port-forward output: %s", portForwardOutput) | |
+ } | |
+ | |
+ listenPort, err := strconv.Atoi(match[1]) | |
+ if err != nil { | |
+ Failf("Error converting %s to an int: %v", match[1], err) | |
+ } | |
+ | |
+ return cmd, listenPort | |
+} | |
+ | |
+var _ = Describe("Port forwarding", func() { | |
+ defer GinkgoRecover() | |
+ | |
+ var ( | |
+ c *client.Client | |
+ ns string | |
+ testingNs *api.Namespace | |
+ ) | |
+ | |
+ BeforeEach(func() { | |
+ var err error | |
+ c, err = loadClient() | |
+ expectNoError(err) | |
+ testingNs, err = createTestingNS("portforward", c) | |
+ Expect(err).NotTo(HaveOccurred()) | |
+ ns = testingNs.Name | |
+ }) | |
+ | |
+ AfterEach(func() { | |
+ By(fmt.Sprintf("Destroying namespace for this suite %v", ns)) | |
+ if err := deleteNS(c, ns); err != nil { | |
+ Failf("Couldn't delete ns %s", err) | |
+ } | |
+ }) | |
+ | |
+ Describe("Client connects, sends no data, disconnects", func() { | |
+ BeforeEach(func() { | |
+ By("creating the pod") | |
+ pod := pfPod("abc", "1", "1", "1") | |
+ c.Pods(ns).Create(pod) | |
+ checkPodsRunningReady(c, ns, []string{podName}, podStartTimeout) | |
+ | |
+ }) | |
+ | |
+ AfterEach(func() { | |
+ c.Pods(ns).Delete(podName, api.NewDeleteOptions(0)) | |
+ }) | |
+ | |
+ It("should exit cleanly", func() { | |
+ cmd, listenPort := runPortForward(ns, podName, 80) | |
+ defer tryKill(cmd) | |
+ | |
+ conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort)) | |
+ if err != nil { | |
+ Failf("Couldn't connect to port %d: %v", listenPort, err) | |
+ } | |
+ conn.Close() | |
+ | |
+ logOutput := runKubectl("logs", fmt.Sprintf("--namespace=%v", ns), podName) | |
+ if !strings.Contains(logOutput, "Accepted client connection") { | |
+ Failf("Missing 'Accepted client connection' from log: %s", logOutput) | |
+ } | |
+ | |
+ pod, err := c.Pods(ns).Get(podName) | |
+ if err != nil { | |
+ Failf("Error retrieving pod: %v", err) | |
+ } | |
+ | |
+ state := pod.Status.ContainerStatuses[0].State | |
+ if state.Terminated == nil { | |
+ Failf("Expected pod to be terminated. Got %#v", state) | |
+ } | |
+ | |
+ if e, a := 2, state.Terminated.ExitCode; e != a { | |
+ Failf("Expected container exit code %d, got %d", e, a) | |
+ } | |
+ }) | |
+ }) | |
+}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment