From dc413bd6d6cab4af1cac8b1927054300ea09667c Mon Sep 17 00:00:00 2001 From: Angela Li Date: Mon, 23 Sep 2019 17:36:43 -0700 Subject: [PATCH] Add windows portforward support Signed-off-by: Angela Li --- hack/utils.sh | 2 +- pkg/config/config_windows.go | 2 +- pkg/server/container_execsync.go | 56 +++++++++++---------- pkg/server/sandbox_portforward.go | 38 +++++++++++++++ pkg/server/sandbox_portforward_unix.go | 15 ------ pkg/server/sandbox_portforward_windows.go | 59 ++++++++++++++++++++--- pkg/server/streaming.go | 4 +- test/windows/test.sh | 2 +- 8 files changed, 124 insertions(+), 54 deletions(-) create mode 100644 pkg/server/sandbox_portforward.go diff --git a/hack/utils.sh b/hack/utils.sh index 06f865b5e..1bafa3a26 100755 --- a/hack/utils.sh +++ b/hack/utils.sh @@ -17,7 +17,7 @@ ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/.. # Not from vendor.conf. -CRITOOL_VERSION=84b8540c69e82671bfc244a0b0fe28a6a98381ce +CRITOOL_VERSION=ad1e8a4f1e0e44b22c1c371b9dc4db92eba6f157 CRITOOL_PKG=github.com/kubernetes-sigs/cri-tools CRITOOL_REPO=github.com/kubernetes-sigs/cri-tools diff --git a/pkg/config/config_windows.go b/pkg/config/config_windows.go index f56514c0b..7a73b985f 100644 --- a/pkg/config/config_windows.go +++ b/pkg/config/config_windows.go @@ -54,7 +54,7 @@ func DefaultConfig() PluginConfig { TLSKeyFile: "", TLSCertFile: "", }, - SandboxImage: "mcr.microsoft.com/k8s/core/pause:1.0.0", + SandboxImage: "mcr.microsoft.com/k8s/core/pause:1.2.0", StatsCollectPeriod: 10, MaxContainerLogLineSize: 16 * 1024, Registry: Registry{ diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index a4838166f..d05ffd7d3 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -70,38 +70,13 @@ type execOptions struct { timeout time.Duration } -// execInContainer executes a command inside the container synchronously, and -// redirects stdio stream properly. -// This function only returns when the exec process exits, this means that: -// 1) As long as the exec process is running, the goroutine in the cri plugin -// will be running and wait for the exit code; -// 2) `kubectl exec -it` will hang until the exec process exits, even after io -// is detached. This is different from dockershim, which leaves the exec process -// running in background after io is detached. -// https://github.com/kubernetes/kubernetes/blob/v1.15.0/pkg/kubelet/dockershim/exec.go#L127 -// For example, if the `kubectl exec -it` process is killed, IO will be closed. In -// this case, the CRI plugin will still have a goroutine waiting for the exec process -// to exit and log the exit code, but dockershim won't. -func (c *criService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) { +func (c *criService) execInternal(ctx context.Context, container containerd.Container, id string, opts execOptions) (*uint32, error) { // Cancel the context before returning to ensure goroutines are stopped. // This is important, because if `Start` returns error, `Wait` will hang // forever unless we cancel the context. ctx, cancel := context.WithCancel(ctx) defer cancel() - // Get container from our container store. - cntr, err := c.containerStore.Get(id) - if err != nil { - return nil, errors.Wrapf(err, "failed to find container %q in store", id) - } - id = cntr.ID - - state := cntr.Status.Get().State() - if state != runtime.ContainerState_CONTAINER_RUNNING { - return nil, errors.Errorf("container is in %s state", criContainerStateToString(state)) - } - - container := cntr.Container spec, err := container.Spec(ctx) if err != nil { return nil, errors.Wrap(err, "failed to get container spec") @@ -205,3 +180,32 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp return &code, nil } } + +// execInContainer executes a command inside the container synchronously, and +// redirects stdio stream properly. +// This function only returns when the exec process exits, this means that: +// 1) As long as the exec process is running, the goroutine in the cri plugin +// will be running and wait for the exit code; +// 2) `kubectl exec -it` will hang until the exec process exits, even after io +// is detached. This is different from dockershim, which leaves the exec process +// running in background after io is detached. +// https://github.com/kubernetes/kubernetes/blob/v1.15.0/pkg/kubelet/dockershim/exec.go#L127 +// For example, if the `kubectl exec -it` process is killed, IO will be closed. In +// this case, the CRI plugin will still have a goroutine waiting for the exec process +// to exit and log the exit code, but dockershim won't. +func (c *criService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) { + // Get container from our container store. + cntr, err := c.containerStore.Get(id) + + if err != nil { + return nil, errors.Wrapf(err, "failed to find container %q in store", id) + } + id = cntr.ID + + state := cntr.Status.Get().State() + if state != runtime.ContainerState_CONTAINER_RUNNING { + return nil, errors.Errorf("container is in %s state", criContainerStateToString(state)) + } + + return c.execInternal(ctx, cntr.Container, id, opts) +} diff --git a/pkg/server/sandbox_portforward.go b/pkg/server/sandbox_portforward.go new file mode 100644 index 000000000..b5d3d5c02 --- /dev/null +++ b/pkg/server/sandbox_portforward.go @@ -0,0 +1,38 @@ +/* +Copyright The containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "github.com/pkg/errors" + "golang.org/x/net/context" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + + sandboxstore "github.com/containerd/cri/pkg/store/sandbox" +) + +// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. +func (c *criService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (retRes *runtime.PortForwardResponse, retErr error) { + sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId()) + if err != nil { + return nil, errors.Wrapf(err, "failed to find sandbox %q", r.GetPodSandboxId()) + } + if sandbox.Status.Get().State != sandboxstore.StateReady { + return nil, errors.New("sandbox container is not running") + } + // TODO(random-liu): Verify that ports are exposed. + return c.streamServer.GetPortForward(r) +} diff --git a/pkg/server/sandbox_portforward_unix.go b/pkg/server/sandbox_portforward_unix.go index 26dbc4075..3b735ac36 100644 --- a/pkg/server/sandbox_portforward_unix.go +++ b/pkg/server/sandbox_portforward_unix.go @@ -31,23 +31,8 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/net/context" runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" - - sandboxstore "github.com/containerd/cri/pkg/store/sandbox" ) -// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. -func (c *criService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (retRes *runtime.PortForwardResponse, retErr error) { - sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId()) - if err != nil { - return nil, errors.Wrapf(err, "failed to find sandbox %q", r.GetPodSandboxId()) - } - if sandbox.Status.Get().State != sandboxstore.StateReady { - return nil, errors.New("sandbox container is not running") - } - // TODO(random-liu): Verify that ports are exposed. - return c.streamServer.GetPortForward(r) -} - // portForward requires `socat` on the node. It uses netns to enter the sandbox namespace, // and run `socat` inside the namespace to forward stream for a specific port. The `socat` // command keeps running until it exits or client disconnect. diff --git a/pkg/server/sandbox_portforward_windows.go b/pkg/server/sandbox_portforward_windows.go index 52ff9cd59..b16242062 100644 --- a/pkg/server/sandbox_portforward_windows.go +++ b/pkg/server/sandbox_portforward_windows.go @@ -19,19 +19,62 @@ limitations under the License. package server import ( + "bytes" + "fmt" "io" - "github.com/containerd/containerd/errdefs" + "github.com/pkg/errors" "golang.org/x/net/context" - runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/utils/exec" + + "github.com/containerd/cri/pkg/ioutil" + sandboxstore "github.com/containerd/cri/pkg/store/sandbox" ) -// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. -// TODO(windows): Implement this for windows. -func (c *criService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (*runtime.PortForwardResponse, error) { - return nil, errdefs.ErrNotImplemented +func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriter) error { + stdout := ioutil.NewNopWriteCloser(stream) + stderrBuffer := new(bytes.Buffer) + stderr := ioutil.NewNopWriteCloser(stderrBuffer) + // localhost is resolved to 127.0.0.1 in ipv4, and ::1 in ipv6. + // Explicitly using ipv4 IP address in here to avoid flakiness. + cmd := []string{"wincat.exe", "127.0.0.1", fmt.Sprint(port)} + err := c.execInSandbox(ctx, id, cmd, stream, stdout, stderr) + if err != nil { + return errors.Wrapf(err, "failed to execute port forward in sandbox: %s", stderrBuffer.String()) + } + return nil } -func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriter) error { - return errdefs.ErrNotImplemented +func (c *criService) execInSandbox(ctx context.Context, sandboxID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser) error { + // Get sandbox from our sandbox store. + sb, err := c.sandboxStore.Get(sandboxID) + if err != nil { + return errors.Wrapf(err, "failed to find sandbox %q in store", sandboxID) + } + + // Check the sandbox state + state := sb.Status.Get().State + if state != sandboxstore.StateReady { + return errors.Errorf("sandbox is in %s state", fmt.Sprint(state)) + } + + opts := execOptions{ + cmd: cmd, + stdin: stdin, + stdout: stdout, + stderr: stderr, + tty: false, + resize: nil, + } + exitCode, err := c.execInternal(ctx, sb.Container, sandboxID, opts) + if err != nil { + return errors.Wrap(err, "failed to exec in sandbox") + } + if *exitCode == 0 { + return nil + } + return &exec.CodeExitError{ + Err: errors.Errorf("error executing command %v, exit code %d", cmd, *exitCode), + Code: int(*exitCode), + } } diff --git a/pkg/server/streaming.go b/pkg/server/streaming.go index e26647470..115fc9751 100644 --- a/pkg/server/streaming.go +++ b/pkg/server/streaming.go @@ -25,7 +25,6 @@ import ( "time" "github.com/pkg/errors" - "golang.org/x/net/context" k8snet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/remotecommand" @@ -156,7 +155,8 @@ func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.R if port <= 0 || port > math.MaxUint16 { return errors.Errorf("invalid port %d", port) } - return s.c.portForward(context.Background(), podSandboxID, port, stream) + ctx := ctrdutil.NamespacedContext() + return s.c.portForward(ctx, podSandboxID, port, stream) } // handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each diff --git a/test/windows/test.sh b/test/windows/test.sh index 9e2c2bb26..99331fe2d 100755 --- a/test/windows/test.sh +++ b/test/windows/test.sh @@ -21,7 +21,7 @@ set -o pipefail export PATH="/c/Program Files/Containerd:$PATH" REPO_TAR="${REPO_TAR:-"/c/cri.tar.gz"}" FOCUS="${FOCUS:-"Conformance"}" -SKIP="${SKIP:-"portforward"}" +SKIP="${SKIP:-""}" REPORT_DIR="${REPORT_DIR:-"/c/_artifacts"}" repo="$GOPATH/src/github.com/containerd/cri"