Add portforward support.
Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
		| @@ -14,6 +14,7 @@ install: | ||||
|     - sudo apt-get install btrfs-tools | ||||
|     - sudo apt-get install libseccomp2/trusty-backports | ||||
|     - sudo apt-get install libseccomp-dev/trusty-backports | ||||
|     - sudo apt-get install socat | ||||
|     - docker run --rm -v /usr/local/bin:/target jpetazzo/nsenter | ||||
|  | ||||
| before_script: | ||||
|   | ||||
| @@ -31,9 +31,12 @@ specifications as appropriate. | ||||
| trusty. | ||||
| 2. Install containerd dependencies. | ||||
| * containerd requires installation of a btrfs development library. `btrfs-tools`(Ubuntu, Debian) / `btrfs-progs-devel`(Fedora, CentOS, RHEL) | ||||
| 3. Install and setup a go1.8.x development environment. | ||||
| 4. Make a local clone of this repository. | ||||
| 5. Install binary dependencies by running the following command from your cloned `cri-containerd/` project directory: | ||||
| 3. Install other dependencies: | ||||
| * `nsenter`: Required by CNI and portforward. | ||||
| * `socat`: Required by portforward. | ||||
| 4. Install and setup a go1.8.x development environment. | ||||
| 5. Make a local clone of this repository. | ||||
| 6. Install binary dependencies by running the following command from your cloned `cri-containerd/` project directory: | ||||
| ```shell | ||||
| # Note: install.deps installs the above mentioned runc, containerd, and CNI | ||||
| # binary dependencies. install.deps is only provided for general use and ease of | ||||
|   | ||||
| @@ -22,7 +22,7 @@ ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/.. | ||||
| # FOCUS focuses the test to run. | ||||
| FOCUS=${FOCUS:-} | ||||
| # SKIP skips the test to skip. | ||||
| SKIP=${SKIP:-"attach|portforward|RunAsUser|host port"} | ||||
| SKIP=${SKIP:-"attach|RunAsUser|host port"} | ||||
| REPORT_DIR=${REPORT_DIR:-"/tmp"} | ||||
|  | ||||
| if [[ -z "${GOPATH}" ]]; then | ||||
|   | ||||
| @@ -30,7 +30,7 @@ func (c *criContainerdService) Exec(ctx context.Context, r *runtime.ExecRequest) | ||||
| 		r.GetContainerId(), r.GetCmd(), r.GetTty(), r.GetStdin()) | ||||
| 	defer func() { | ||||
| 		if retErr == nil { | ||||
| 			glog.V(2).Infof("Exec for %q returns URL %q", r.GetContainerId(), retRes.Url) | ||||
| 			glog.V(2).Infof("Exec for %q returns URL %q", r.GetContainerId(), retRes.GetUrl()) | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
|   | ||||
| @@ -17,14 +17,109 @@ limitations under the License. | ||||
| package server | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os/exec" | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/containerd/containerd" | ||||
| 	"github.com/golang/glog" | ||||
| 	"golang.org/x/net/context" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" | ||||
| ) | ||||
|  | ||||
| // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. | ||||
| func (c *criContainerdService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (*runtime.PortForwardResponse, error) { | ||||
| 	return nil, errors.New("not implemented") | ||||
| func (c *criContainerdService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (retRes *runtime.PortForwardResponse, retErr error) { | ||||
| 	// TODO(random-liu): Run a socat container inside the sandbox to do portforward. | ||||
| 	glog.V(2).Infof("Portforward for sandbox %q port %v", r.GetPodSandboxId(), r.GetPort()) | ||||
| 	defer func() { | ||||
| 		if retErr == nil { | ||||
| 			glog.V(2).Infof("Portforward for %q returns URL %q", r.GetPodSandboxId(), retRes.GetUrl()) | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId()) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to find sandbox: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	t, err := sandbox.Container.Task(ctx, nil) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to get sandbox container: %v", err) | ||||
| 	} | ||||
| 	status, err := t.Status(ctx) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to get sandbox container status: %v", err) | ||||
| 	} | ||||
| 	if status.Status != containerd.Running { | ||||
| 		return nil, errors.New("sandbox container is not running") | ||||
| 	} | ||||
| 	// TODO(random-liu): Verify that ports are exposed. | ||||
| 	return c.streamServer.GetPortForward(r) | ||||
| } | ||||
|  | ||||
| // portForward requires `nsenter` and `socat` on the node, it uses `nsenter` to enter the | ||||
| // sandbox namespace, and run `socat` inside the namespace to forward stream for a specific | ||||
| // port. The `socat` command keeps running until it exits or client disconnect. | ||||
| func (c *criContainerdService) portForward(id string, port int32, stream io.ReadWriteCloser) error { | ||||
| 	s, err := c.sandboxStore.Get(id) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to find sandbox in store: %v", err) | ||||
| 	} | ||||
| 	pid := s.Pid | ||||
|  | ||||
| 	socat, err := exec.LookPath("socat") | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to find socat: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// Check following links for meaning of the options: | ||||
| 	// * socat: https://linux.die.net/man/1/socat | ||||
| 	// * nsenter: http://man7.org/linux/man-pages/man1/nsenter.1.html | ||||
| 	args := []string{"-t", fmt.Sprintf("%d", pid), "-n", socat, | ||||
| 		"-", fmt.Sprintf("TCP4:localhost:%d", port)} | ||||
|  | ||||
| 	nsenter, err := exec.LookPath("nsenter") | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to find nsenter: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	glog.V(2).Infof("Executing port forwarding command: %s %s", nsenter, strings.Join(args, " ")) | ||||
|  | ||||
| 	cmd := exec.Command(nsenter, args...) | ||||
| 	cmd.Stdout = stream | ||||
|  | ||||
| 	stderr := new(bytes.Buffer) | ||||
| 	cmd.Stderr = stderr | ||||
|  | ||||
| 	// If we use Stdin, command.Run() won't return until the goroutine that's copying | ||||
| 	// from stream finishes. Unfortunately, if you have a client like telnet connected | ||||
| 	// via port forwarding, as long as the user's telnet client is connected to the user's | ||||
| 	// local listener that port forwarding sets up, the telnet session never exits. This | ||||
| 	// means that even if socat has finished running, command.Run() won't ever return | ||||
| 	// (because the client still has the connection and stream open). | ||||
| 	// | ||||
| 	// The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe | ||||
| 	// when the command (socat) exits. | ||||
| 	in, err := cmd.StdinPipe() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create stdin pipe: %v", err) | ||||
| 	} | ||||
| 	go func() { | ||||
| 		if _, err := io.Copy(in, stream); err != nil { | ||||
| 			glog.Errorf("Failed to copy port forward input for %q port %d: %v", id, port, err) | ||||
| 		} | ||||
| 		in.Close() | ||||
| 		glog.V(4).Infof("Finish copy port forward input for %q port %d: %v", id, port) | ||||
| 	}() | ||||
|  | ||||
| 	if err := cmd.Run(); err != nil { | ||||
| 		return fmt.Errorf("nsenter command returns error: %v, stderr: %q", err, stderr.String()) | ||||
| 	} | ||||
|  | ||||
| 	glog.V(2).Infof("Finish port forwarding for %q port %d", id, port) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"math" | ||||
| 	"net" | ||||
|  | ||||
| 	"golang.org/x/net/context" | ||||
| @@ -83,7 +84,10 @@ func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.Wri | ||||
| } | ||||
|  | ||||
| func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { | ||||
| 	return errors.New("not implemented") | ||||
| 	if port <= 0 || port > math.MaxUint16 { | ||||
| 		return fmt.Errorf("invalid port %d", port) | ||||
| 	} | ||||
| 	return s.c.portForward(podSandboxID, port, stream) | ||||
| } | ||||
|  | ||||
| // handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Lantao Liu
					Lantao Liu