diff --git a/.travis.yml b/.travis.yml index 7568312f9..d98a721e1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,6 +14,7 @@ install: - sudo apt-get install btrfs-tools - sudo apt-get install libseccomp2/trusty-backports - sudo apt-get install libseccomp-dev/trusty-backports + - sudo apt-get install socat - docker run --rm -v /usr/local/bin:/target jpetazzo/nsenter before_script: diff --git a/README.md b/README.md index a405997cf..e4a6c8b29 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,12 @@ specifications as appropriate. trusty. 2. Install containerd dependencies. * containerd requires installation of a btrfs development library. `btrfs-tools`(Ubuntu, Debian) / `btrfs-progs-devel`(Fedora, CentOS, RHEL) -3. Install and setup a go1.8.x development environment. -4. Make a local clone of this repository. -5. Install binary dependencies by running the following command from your cloned `cri-containerd/` project directory: +3. Install other dependencies: +* `nsenter`: Required by CNI and portforward. +* `socat`: Required by portforward. +4. Install and setup a go1.8.x development environment. +5. Make a local clone of this repository. +6. Install binary dependencies by running the following command from your cloned `cri-containerd/` project directory: ```shell # Note: install.deps installs the above mentioned runc, containerd, and CNI # binary dependencies. install.deps is only provided for general use and ease of diff --git a/hack/test-cri.sh b/hack/test-cri.sh index 061fbabba..44dae9199 100755 --- a/hack/test-cri.sh +++ b/hack/test-cri.sh @@ -22,7 +22,7 @@ ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/.. # FOCUS focuses the test to run. FOCUS=${FOCUS:-} # SKIP skips the test to skip. -SKIP=${SKIP:-"attach|portforward|RunAsUser|host port"} +SKIP=${SKIP:-"attach|RunAsUser|host port"} REPORT_DIR=${REPORT_DIR:-"/tmp"} if [[ -z "${GOPATH}" ]]; then diff --git a/pkg/server/container_exec.go b/pkg/server/container_exec.go index c0a050959..a6f48e499 100644 --- a/pkg/server/container_exec.go +++ b/pkg/server/container_exec.go @@ -30,7 +30,7 @@ func (c *criContainerdService) Exec(ctx context.Context, r *runtime.ExecRequest) r.GetContainerId(), r.GetCmd(), r.GetTty(), r.GetStdin()) defer func() { if retErr == nil { - glog.V(2).Infof("Exec for %q returns URL %q", r.GetContainerId(), retRes.Url) + glog.V(2).Infof("Exec for %q returns URL %q", r.GetContainerId(), retRes.GetUrl()) } }() diff --git a/pkg/server/sandbox_portforward.go b/pkg/server/sandbox_portforward.go index f176cdcfc..b29f084d2 100644 --- a/pkg/server/sandbox_portforward.go +++ b/pkg/server/sandbox_portforward.go @@ -17,14 +17,109 @@ limitations under the License. package server import ( + "bytes" "errors" + "fmt" + "io" + "os/exec" + "strings" + "github.com/containerd/containerd" + "github.com/golang/glog" "golang.org/x/net/context" - "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. -func (c *criContainerdService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (*runtime.PortForwardResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (retRes *runtime.PortForwardResponse, retErr error) { + // TODO(random-liu): Run a socat container inside the sandbox to do portforward. + glog.V(2).Infof("Portforward for sandbox %q port %v", r.GetPodSandboxId(), r.GetPort()) + defer func() { + if retErr == nil { + glog.V(2).Infof("Portforward for %q returns URL %q", r.GetPodSandboxId(), retRes.GetUrl()) + } + }() + + sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId()) + if err != nil { + return nil, fmt.Errorf("failed to find sandbox: %v", err) + } + + t, err := sandbox.Container.Task(ctx, nil) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox container: %v", err) + } + status, err := t.Status(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox container status: %v", err) + } + if status.Status != containerd.Running { + return nil, errors.New("sandbox container is not running") + } + // TODO(random-liu): Verify that ports are exposed. + return c.streamServer.GetPortForward(r) +} + +// portForward requires `nsenter` and `socat` on the node, it uses `nsenter` to enter the +// sandbox namespace, and run `socat` inside the namespace to forward stream for a specific +// port. The `socat` command keeps running until it exits or client disconnect. +func (c *criContainerdService) portForward(id string, port int32, stream io.ReadWriteCloser) error { + s, err := c.sandboxStore.Get(id) + if err != nil { + return fmt.Errorf("failed to find sandbox in store: %v", err) + } + pid := s.Pid + + socat, err := exec.LookPath("socat") + if err != nil { + return fmt.Errorf("failed to find socat: %v", err) + } + + // Check following links for meaning of the options: + // * socat: https://linux.die.net/man/1/socat + // * nsenter: http://man7.org/linux/man-pages/man1/nsenter.1.html + args := []string{"-t", fmt.Sprintf("%d", pid), "-n", socat, + "-", fmt.Sprintf("TCP4:localhost:%d", port)} + + nsenter, err := exec.LookPath("nsenter") + if err != nil { + return fmt.Errorf("failed to find nsenter: %v", err) + } + + glog.V(2).Infof("Executing port forwarding command: %s %s", nsenter, strings.Join(args, " ")) + + cmd := exec.Command(nsenter, args...) + cmd.Stdout = stream + + stderr := new(bytes.Buffer) + cmd.Stderr = stderr + + // If we use Stdin, command.Run() won't return until the goroutine that's copying + // from stream finishes. Unfortunately, if you have a client like telnet connected + // via port forwarding, as long as the user's telnet client is connected to the user's + // local listener that port forwarding sets up, the telnet session never exits. This + // means that even if socat has finished running, command.Run() won't ever return + // (because the client still has the connection and stream open). + // + // The work around is to use StdinPipe(), as Wait() (called by Run()) closes the pipe + // when the command (socat) exits. + in, err := cmd.StdinPipe() + if err != nil { + return fmt.Errorf("failed to create stdin pipe: %v", err) + } + go func() { + if _, err := io.Copy(in, stream); err != nil { + glog.Errorf("Failed to copy port forward input for %q port %d: %v", id, port, err) + } + in.Close() + glog.V(4).Infof("Finish copy port forward input for %q port %d: %v", id, port) + }() + + if err := cmd.Run(); err != nil { + return fmt.Errorf("nsenter command returns error: %v, stderr: %q", err, stderr.String()) + } + + glog.V(2).Infof("Finish port forwarding for %q port %d", id, port) + + return nil } diff --git a/pkg/server/streaming.go b/pkg/server/streaming.go index 29eb9a62e..cf58cc685 100644 --- a/pkg/server/streaming.go +++ b/pkg/server/streaming.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "math" "net" "golang.org/x/net/context" @@ -83,7 +84,10 @@ func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.Wri } func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { - return errors.New("not implemented") + if port <= 0 || port > math.MaxUint16 { + return fmt.Errorf("invalid port %d", port) + } + return s.c.portForward(podSandboxID, port, stream) } // handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each